contexa-core
AI Pipeline
AI Pipeline은 pipeline-backed AI 요청을 위한 설정 가능한 다단계 실행 흐름을 제공합니다. PipelineOrchestrator는 설정에 따라 적절한 PipelineExecutor를 선택하고, 컨텍스트 검색부터 구조화된 응답 파싱까지 전체 수명 주기를 조정합니다.
개요
Pipeline은 직접적인 단일 LLM 호출이 아니라 전체 처리 흐름 (RAG 컨텍스트 검색, 프롬프트 생성, LLM 실행, 응답 파싱)이 필요한 사용 사례를 위해 설계되었습니다. 오케스트레이터는 표준 실행기와 Streaming 실행기 사이에서 자동으로 선택하며, 필요하면 SOAR_TOOL_EXECUTION 단계를 명시적으로 추가할 수 있습니다.
Pipeline 흐름
CONTEXT_RETRIEVAL
PREPROCESSING
PROMPT_GENERATION
LLM_EXECUTION
RESPONSE_PARSING
POSTPROCESSING
PipelineOrchestrator
주어진 설정에 따라 적절한 PipelineExecutor를 선택하고 위임하는 중앙 조정자입니다.
public class PipelineOrchestrator
execute(AIRequest<T> request, Class<R> responseType)
Mono<R>
기본 설정 (
createPipelineConfig())으로 Pipeline을 실행합니다. 실행기를 선택하고 Pipeline을 실행한 후 타입이 지정된 응답을 반환합니다.
execute(AIRequest<T> request, PipelineConfiguration config, Class<R> responseType)
Mono<R>
커스텀 설정으로 Pipeline을 실행합니다. 실패 시 자동 폴백 응답 생성을 포함합니다.
executeStream(AIRequest<T> request)
Flux<String>
기본 Streaming 설정으로 Pipeline을 Streaming 모드로 실행합니다.
executeStream(AIRequest<T> request, PipelineConfiguration config)
Flux<String>
커스텀 설정으로 Pipeline을 Streaming 모드로 실행합니다. Streaming 가능한 실행기를 선택합니다.
PipelineConfiguration
Pipeline 실행을 위한 단계, 파라미터, 타임아웃, Streaming 모드를 정의합니다.
public class PipelineConfiguration
| 속성 | 타입 | 기본값 | 설명 |
|---|---|---|---|
steps | List<PipelineStep> | 빈 값 | 실행할 Pipeline 단계의 정렬된 목록. |
parameters | Map<String, Object> | 빈 값 | 단계 설정을 위한 임의의 키-값 파라미터. |
timeoutSeconds | int | 300 | 최대 실행 시간 (초). |
enableStreaming | boolean | false | Streaming 실행 모드 사용 여부. |
name | String | null | 식별을 위한 선택적 Pipeline 이름. |
description | String | null | Pipeline 목적에 대한 선택적 설명. |
PipelineStep Enum
| 단계 | 설명 |
|---|---|
PREPROCESSING | 기본 요청 메타데이터 문자열을 구성합니다. 검증이나 정규화는 같은 단계에 커스텀 스텝으로 추가할 수 있습니다. |
CONTEXT_RETRIEVAL | 벡터 스토어에서 RAG 컨텍스트를 검색합니다. 전용 retriever가 없으면 기본 retriever로 폴백하고, 일치 문서가 없어도 빈/저신호 컨텍스트로 다음 단계가 계속 진행됩니다. |
PROMPT_GENERATION | 요청 데이터와 검색된 컨텍스트로 최종 프롬프트를 구성합니다. |
LLM_EXECUTION | UnifiedLLMOrchestrator를 통해 프롬프트를 실행합니다. |
SOAR_TOOL_EXECUTION | 선택적 도구 실행 단계입니다. 기본 파이프라인에는 포함되지 않으며, 명시적으로 추가할 때만 LLM 결과와 응답 파싱 사이에 실행됩니다. |
RESPONSE_PARSING | 원시 LLM 출력을 대상 응답 타입으로 파싱합니다. |
POSTPROCESSING | 최종 보강, 검증, 후처리를 수행합니다. |
팩토리 메서드
createPipelineConfig()
PipelineConfiguration
기본 Pipeline 설정을 생성합니다. 단계: CONTEXT_RETRIEVAL, PREPROCESSING, PROMPT_GENERATION, LLM_EXECUTION, RESPONSE_PARSING, POSTPROCESSING. 타임아웃: 300초.
createStreamPipelineConfig()
PipelineConfiguration
Streaming Pipeline 설정을 생성합니다. 단계: CONTEXT_RETRIEVAL, PREPROCESSING, PROMPT_GENERATION, LLM_EXECUTION. Streaming 활성화, 타임아웃: 300초.
선택적 SOAR 단계: PipelineConfiguration.PipelineStep.SOAR_TOOL_EXECUTION는 기본 팩토리 메서드에 포함되지 않습니다. 필요할 때만 수동으로 설정에 추가하고, LLM_EXECUTION과 RESPONSE_PARSING 사이에 배치해야 합니다.
PipelineExecutor
구체적인 실행기가 Pipeline 설정을 처리하기 위해 구현하는 인터페이스입니다.
public interface PipelineExecutor
execute(AIRequest<T> request, PipelineConfiguration config, Class<R> responseType)
Mono<R>
Pipeline 단계를 순서대로 실행하고 타입이 지정된 응답을 반환합니다.
executeStream(AIRequest<T> request, PipelineConfiguration config)
Flux<String>
Pipeline을 Streaming 모드로 실행합니다.
supportsConfiguration(PipelineConfiguration config)
boolean
이 실행기가 주어진 설정을 처리할 수 있는지 여부를 반환합니다.
supportsStreaming()
boolean
이 실행기가 Streaming을 지원하는지 여부를 반환합니다. 오케스트레이터의 실행기 선택에 사용됩니다.
getPriority()
int
실행기 우선순위를 반환합니다. 낮은 값이 우선됩니다. 기본값: 100.
내장 구현체
| 클래스 | Streaming | 설명 |
|---|---|---|
UniversalPipelineExecutor | 아니오 | 표준 동기/비동기 Pipeline 실행기. 모든 단계를 순차적으로 처리하고 최종 응답을 반환합니다. |
StreamingUniversalPipelineExecutor | 예 | 리액티브 Flux<String>으로 LLM 출력을 전달하는 Streaming 가능 실행기. |
코드 예제
표준 Pipeline 실행
Java
AIRequest<SecurityContext> request = new AIRequest<>(
securityContext,
new TemplateType("SecurityDecisionStandard"),
new DiagnosisType("SecurityDecision")
);
request.setNaturalLanguageQuery("Evaluate the current request context");
ThreatResponse response = pipelineOrchestrator
.execute(request, ThreatResponse.class)
.block();
커스텀 Pipeline 설정
Java
PipelineConfiguration config = PipelineConfiguration.builder()
.addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
.addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
.addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
.addStep(PipelineConfiguration.PipelineStep.SOAR_TOOL_EXECUTION)
.addStep(PipelineConfiguration.PipelineStep.RESPONSE_PARSING)
.timeoutSeconds(120)
.build();
SecurityResponse response = pipelineOrchestrator
.execute(request, config, SecurityResponse.class)
.block();
Streaming Pipeline
Java
Flux<String> chunks = pipelineOrchestrator
.executeStream(request);
chunks.subscribe(
chunk -> sendSseEvent(chunk),
error -> handleError(error),
() -> completeStream()
);
관련 문서
설정 레퍼런스
Pipeline
Pipeline
application.yml 속성은 보안 설정을 참조하세요 — SecurityPipelineProperties와 SecurityColdPathProperties를 다룹니다.