contexa-core

AI Pipeline

AI Pipeline은 pipeline-backed AI 요청을 위한 설정 가능한 다단계 실행 흐름을 제공합니다. PipelineOrchestrator는 설정에 따라 적절한 PipelineExecutor를 선택하고, 컨텍스트 검색부터 구조화된 응답 파싱까지 전체 수명 주기를 조정합니다.

개요

Pipeline은 직접적인 단일 LLM 호출이 아니라 전체 처리 흐름 (RAG 컨텍스트 검색, 프롬프트 생성, LLM 실행, 응답 파싱)이 필요한 사용 사례를 위해 설계되었습니다. 오케스트레이터는 표준 실행기와 Streaming 실행기 사이에서 자동으로 선택하며, 필요하면 SOAR_TOOL_EXECUTION 단계를 명시적으로 추가할 수 있습니다.

Pipeline 흐름

CONTEXT_RETRIEVAL
PREPROCESSING
PROMPT_GENERATION
LLM_EXECUTION
RESPONSE_PARSING
POSTPROCESSING

PipelineOrchestrator

주어진 설정에 따라 적절한 PipelineExecutor를 선택하고 위임하는 중앙 조정자입니다.

public class PipelineOrchestrator
execute(AIRequest<T> request, Class<R> responseType) Mono<R>
기본 설정 (createPipelineConfig())으로 Pipeline을 실행합니다. 실행기를 선택하고 Pipeline을 실행한 후 타입이 지정된 응답을 반환합니다.
execute(AIRequest<T> request, PipelineConfiguration config, Class<R> responseType) Mono<R>
커스텀 설정으로 Pipeline을 실행합니다. 실패 시 자동 폴백 응답 생성을 포함합니다.
executeStream(AIRequest<T> request) Flux<String>
기본 Streaming 설정으로 Pipeline을 Streaming 모드로 실행합니다.
executeStream(AIRequest<T> request, PipelineConfiguration config) Flux<String>
커스텀 설정으로 Pipeline을 Streaming 모드로 실행합니다. Streaming 가능한 실행기를 선택합니다.

PipelineConfiguration

Pipeline 실행을 위한 단계, 파라미터, 타임아웃, Streaming 모드를 정의합니다.

public class PipelineConfiguration
속성타입기본값설명
stepsList<PipelineStep>빈 값실행할 Pipeline 단계의 정렬된 목록.
parametersMap<String, Object>빈 값단계 설정을 위한 임의의 키-값 파라미터.
timeoutSecondsint300최대 실행 시간 (초).
enableStreamingbooleanfalseStreaming 실행 모드 사용 여부.
nameStringnull식별을 위한 선택적 Pipeline 이름.
descriptionStringnullPipeline 목적에 대한 선택적 설명.

PipelineStep Enum

단계설명
PREPROCESSING기본 요청 메타데이터 문자열을 구성합니다. 검증이나 정규화는 같은 단계에 커스텀 스텝으로 추가할 수 있습니다.
CONTEXT_RETRIEVAL벡터 스토어에서 RAG 컨텍스트를 검색합니다. 전용 retriever가 없으면 기본 retriever로 폴백하고, 일치 문서가 없어도 빈/저신호 컨텍스트로 다음 단계가 계속 진행됩니다.
PROMPT_GENERATION요청 데이터와 검색된 컨텍스트로 최종 프롬프트를 구성합니다.
LLM_EXECUTIONUnifiedLLMOrchestrator를 통해 프롬프트를 실행합니다.
SOAR_TOOL_EXECUTION선택적 도구 실행 단계입니다. 기본 파이프라인에는 포함되지 않으며, 명시적으로 추가할 때만 LLM 결과와 응답 파싱 사이에 실행됩니다.
RESPONSE_PARSING원시 LLM 출력을 대상 응답 타입으로 파싱합니다.
POSTPROCESSING최종 보강, 검증, 후처리를 수행합니다.

팩토리 메서드

createPipelineConfig() PipelineConfiguration
기본 Pipeline 설정을 생성합니다. 단계: CONTEXT_RETRIEVAL, PREPROCESSING, PROMPT_GENERATION, LLM_EXECUTION, RESPONSE_PARSING, POSTPROCESSING. 타임아웃: 300초.
createStreamPipelineConfig() PipelineConfiguration
Streaming Pipeline 설정을 생성합니다. 단계: CONTEXT_RETRIEVAL, PREPROCESSING, PROMPT_GENERATION, LLM_EXECUTION. Streaming 활성화, 타임아웃: 300초.

선택적 SOAR 단계: PipelineConfiguration.PipelineStep.SOAR_TOOL_EXECUTION는 기본 팩토리 메서드에 포함되지 않습니다. 필요할 때만 수동으로 설정에 추가하고, LLM_EXECUTIONRESPONSE_PARSING 사이에 배치해야 합니다.

PipelineExecutor

구체적인 실행기가 Pipeline 설정을 처리하기 위해 구현하는 인터페이스입니다.

public interface PipelineExecutor
execute(AIRequest<T> request, PipelineConfiguration config, Class<R> responseType) Mono<R>
Pipeline 단계를 순서대로 실행하고 타입이 지정된 응답을 반환합니다.
executeStream(AIRequest<T> request, PipelineConfiguration config) Flux<String>
Pipeline을 Streaming 모드로 실행합니다.
supportsConfiguration(PipelineConfiguration config) boolean
이 실행기가 주어진 설정을 처리할 수 있는지 여부를 반환합니다.
supportsStreaming() boolean
이 실행기가 Streaming을 지원하는지 여부를 반환합니다. 오케스트레이터의 실행기 선택에 사용됩니다.
getPriority() int
실행기 우선순위를 반환합니다. 낮은 값이 우선됩니다. 기본값: 100.

내장 구현체

클래스Streaming설명
UniversalPipelineExecutor아니오표준 동기/비동기 Pipeline 실행기. 모든 단계를 순차적으로 처리하고 최종 응답을 반환합니다.
StreamingUniversalPipelineExecutor리액티브 Flux<String>으로 LLM 출력을 전달하는 Streaming 가능 실행기.

코드 예제

표준 Pipeline 실행

Java
AIRequest<SecurityContext> request = new AIRequest<>(
        securityContext,
        new TemplateType("SecurityDecisionStandard"),
        new DiagnosisType("SecurityDecision")
);
request.setNaturalLanguageQuery("Evaluate the current request context");

ThreatResponse response = pipelineOrchestrator
        .execute(request, ThreatResponse.class)
        .block();

커스텀 Pipeline 설정

Java
PipelineConfiguration config = PipelineConfiguration.builder()
        .addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
        .addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
        .addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
        .addStep(PipelineConfiguration.PipelineStep.SOAR_TOOL_EXECUTION)
        .addStep(PipelineConfiguration.PipelineStep.RESPONSE_PARSING)
        .timeoutSeconds(120)
        .build();

SecurityResponse response = pipelineOrchestrator
        .execute(request, config, SecurityResponse.class)
        .block();

Streaming Pipeline

Java
Flux<String> chunks = pipelineOrchestrator
        .executeStream(request);

chunks.subscribe(
        chunk -> sendSseEvent(chunk),
        error -> handleError(error),
        () -> completeStream()
);
설정 레퍼런스
Pipeline application.yml 속성은 보안 설정을 참조하세요 — SecurityPipelinePropertiesSecurityColdPathProperties를 다룹니다.