Pipeline 및 RAG
AI Pipeline은 RAG로 관련 컨텍스트 조회부터 구조화된 LLM 응답 파싱까지 설정 가능한 단계 시퀀스를 통해 pipeline 기반 AI 요청을 처리합니다. PipelineOrchestrator가 적절한 실행기를 선택하고 전체 수명 주기를 관리합니다.
개요: 기본 6단계 Pipeline
Lab이 orchestrator.execute() 또는 orchestrator.executeStream()을 호출하면 기본 구성은 6개 처리 단계를 거칩니다. 런타임은 또한 pipeline이 명시적으로 추가할 때 LLM_EXECUTION과 RESPONSE_PARSING 사이에 삽입할 수 있는 선택적 SOAR_TOOL_EXECUTION 단계를 정의합니다.
각 단계는 PipelineStep 빈으로 구현됩니다. 오케스트레이터가 PipelineExecutor(표준 또는 스트리밍)를 선택하고 구성된 단계를 순서대로 실행합니다. createPipelineConfig()는 기본적으로 선택적 SOAR 단계를 포함하지 않지만, enum과 실행기 모두 이를 지원합니다.
단계별 상세 분석
1. CONTEXT_RETRIEVAL
벡터 스토어에서 관련 컨텍스트를 조회해 도메인별 지식으로 LLM 프롬프트를 보강합니다.
- 컴포넌트:
ContextRetriever(또는 사용자의 커스텀 하위 클래스) - 입력: 도메인 컨텍스트와 선택적 자연어 질의를 가진
AIRequest - 출력: 조회된 컨텍스트 문자열, 일치 문서, 메타데이터를 담은
ContextRetrievalResult - 동작: retriever가 요청에서 검색 질의를 추출해 벡터 스토어에 유사도 검색을 수행하고, 일치 문서를 컨텍스트 문자열로 형식화합니다. 이 문자열은 프롬프트 템플릿에
contextInfo로 전달됩니다.
ContextRetrieverRegistry는 현재 컨텍스트 타입에 대한 retriever를 해결하고, 특정 등록이 없을 때 기본 retriever로 폴백합니다. 검색이 일치 결과를 반환하지 못하면 다운스트림 프롬프트는 빈 또는 약한 신호 컨텍스트 문자열로 그대로 실행됩니다.
2. PREPROCESSING
프롬프트 생성 전에 기본 시스템 메타데이터를 수집합니다.
- 동작: 내장
PreprocessingStep이 요청 ID, 요청 타입, 컨텍스트 타입, 처리 시간을 담은 형식화된 메타데이터 문자열을 저장합니다. 커스텀 단계는 동일한PREPROCESSING단계를 대상으로 검증·정규화 로직을 추가할 수 있습니다. - 커스터마이징:
PipelineStep인터페이스를 구현해 자체 전처리 로직을 추가할 수 있습니다 (아래 커스텀 Pipeline 단계 참조).
3. PROMPT_GENERATION
도메인 시스템 프롬프트, 사용자 프롬프트, 조회된 컨텍스트로 최종 프롬프트를 구성합니다.
- 컴포넌트: 사용자의
PromptTemplate구현 (TemplateType으로 매칭) - 동작: 프레임워크가 일치하는 템플릿의
generateSystemPrompt()와generateUserPrompt()를 호출합니다. 표준 템플릿은 JSON 형식 지시문이 시스템 프롬프트에 자동 추가되며, 스트리밍 템플릿은 스트리밍 프로토콜 지시문과 JSON 스키마 예제가 주입됩니다. - 결과: 시스템·사용자 메시지를 가진 Spring AI
Prompt객체로, LLM 실행 준비 완료 상태입니다.
4. LLM_EXECUTION
구성된 프롬프트를 UnifiedLLMOrchestrator를 통해 LLM에 전송합니다.
- 컴포넌트:
UnifiedLLMOrchestrator - 동작: 오케스트레이터가 요청의 tier 또는 분석 수준을 기반으로 모델을 선택하고, 등록된 Advisor를 적용하며, 프롬프트를 LLM에 전송합니다. 표준 모드에서는 전체 텍스트 응답을 반환하고, 스트리밍 모드에서는 청크의
Flux<String>을 반환합니다. - 설정: 모델 선택과 tier 타임아웃은
spring.ai.security·spring.ai.security.tiered로 구성합니다. 스트리밍과 검색 동작은contexa.streaming·contexa.rag로 별도 설정합니다. LLM 및 모델을 참조하세요.
5. SOAR_TOOL_EXECUTION (선택)
Pipeline 구성에 SOAR 단계가 포함될 때 응답 파싱 전 LLM 후 tool 기반 액션을 실행합니다.
- 구성 단계:
PipelineConfiguration.PipelineStep.SOAR_TOOL_EXECUTION - 동작: 표준 실행기는 LLM 결과를 파싱 전에 tool 실행 단계로 전달할 수 있습니다.
ResponseParsingStep이 이 단계를 먼저 확인하고, SOAR 단계가 없으면 원시LLM_EXECUTION결과로 폴백합니다. - 기본 동작: 내장 기본 pipeline 구성은 이 단계를 자동 추가하지 않습니다.
6. RESPONSE_PARSING
원시 LLM 텍스트 출력을 타입이 지정된 응답 객체로 파싱합니다.
- 동작: 프레임워크가 LLM 응답에서 JSON을 추출하고(마크다운 코드 블록·여분 텍스트 등 처리),
BeanOutputConverter를 사용해AIResponse하위 클래스로 역직렬화합니다. JSON 추출에 실패하면 폴백 파서가 괄호 균형 맞춤·복구를 시도합니다. - 스트리밍 참고: 이 단계는 스트리밍 모드에서 사용되지 않습니다. 대신
JsonStreamingProcessor가 프로토콜 마커를 사용해 스트림에서 JSON 추출을 처리합니다.
7. POSTPROCESSING
파싱된 응답에 대한 최종 보강·검증·도메인별 처리.
- 컴포넌트:
DomainResponseProcessor(선택, 커스텀) - 동작: 요청의 템플릿 키를 지원하는
DomainResponseProcessor가 등록되어 있다면, 그wrapResponse()메서드가 호출되어 파싱된 응답을 변환·보강합니다. 실행 메타데이터(타이밍·요청 ID)도 첨부됩니다. - 스트리밍 참고: 이 단계는 스트리밍 모드에서 사용되지 않습니다.
표준 vs 스트리밍 Pipeline
Pipeline은 PipelineConfiguration에 따라 두 가지 모드로 동작합니다:
| 측면 | 표준 Pipeline | 스트리밍 Pipeline |
|---|---|---|
| 단계 | 기본 6단계, 명시적 구성 시 선택적 SOAR_TOOL_EXECUTION 포함 | 처음 4단계 (LLM_EXECUTION까지) |
| 실행기 | UniversalPipelineExecutor | StreamingUniversalPipelineExecutor |
| 반환 타입 | Mono<R> (타입 응답) | Flux<String> (청크 스트림) |
| JSON 처리 | RESPONSE_PARSING의 BeanOutputConverter | 프로토콜 마커가 있는 JsonStreamingProcessor |
| 구성 플래그 | enableStreaming(false) (기본값) | enableStreaming(true) |
| 팩토리 메서드 | PipelineConfiguration.createPipelineConfig() | PipelineConfiguration.createStreamPipelineConfig() |
표준 Pipeline 구성
// Default 6-step pipeline, no streaming
PipelineConfiguration config = PipelineConfiguration.builder()
.addStep(PipelineConfiguration.PipelineStep.CONTEXT_RETRIEVAL)
.addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
.addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
.addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
.addStep(PipelineConfiguration.PipelineStep.RESPONSE_PARSING)
.addStep(PipelineConfiguration.PipelineStep.POSTPROCESSING)
.timeoutSeconds(300)
.build();
Mono<MyResponse> result = orchestrator.execute(
aiRequest, config, MyResponse.class);
스트리밍 Pipeline 구성
// First 4 steps, streaming enabled
PipelineConfiguration config = PipelineConfiguration.builder()
.addStep(PipelineConfiguration.PipelineStep.CONTEXT_RETRIEVAL)
.addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
.addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
.addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
.enableStreaming(true)
.timeoutSeconds(300)
.build();
Flux<String> chunks = orchestrator.executeStream(
aiRequest, config);
내장 팩토리 메서드를 사용할 수도 있습니다:
// Equivalent to the manual configurations above
PipelineConfiguration.createPipelineConfig(); // standard
PipelineConfiguration.createStreamPipelineConfig(); // streaming
선택적 SOAR 단계가 필요하면 구성을 수동으로 빌드하고 LLM_EXECUTION과 RESPONSE_PARSING 사이에 PipelineConfiguration.PipelineStep.SOAR_TOOL_EXECUTION을 추가합니다.
RAG 통합 가이드
RAG(Retrieval-Augmented Generation)는 데이터에서 관련 컨텍스트를 제공하여 LLM 응답을 향상시킵니다. ContextRetriever가 사용 가능할 때 CONTEXT_RETRIEVAL 단계가 이를 자동 처리합니다.
Pipeline에서 RAG 동작 방식
- CONTEXT_RETRIEVAL 단계가 사용자의
ContextRetriever.retrieveContext()를 호출합니다. - retriever가 요청 질의와 유사한 문서를 벡터 스토어에서 검색합니다.
- 일치 문서가 컨텍스트 문자열로 형식화됩니다.
- 컨텍스트 문자열이 PROMPT_GENERATION 단계의
generateUserPrompt(request, contextInfo)에 전달됩니다. - 프롬프트 템플릿이 LLM에 전송되는 사용자 프롬프트에 이 컨텍스트를 포함합니다.
ContextRetriever 구현
@Component
public class MyContextRetriever extends ContextRetriever {
public MyContextRetriever(
VectorStore vectorStore,
ContexaRagProperties ragProperties) {
super(vectorStore, ragProperties);
}
@Override
public ContextRetrievalResult retrieveContext(
AIRequest<? extends DomainContext> request) {
// Extract search query from your domain context
String query = extractQueryFromRequest(request);
// Search the vector store via the VectorOperations facade
List<Document> docs = vectorOperations
.searchSimilar(query);
// Format results as context string
String contextInfo = docs.stream()
.map(doc -> doc.getText())
.collect(Collectors.joining("\n\n"));
return new ContextRetrievalResult(
contextInfo, docs,
Map.of("retrieverType", "custom",
"docCount", docs.size()));
}
}
RAG용 문서 저장
검색이 작동하려면 먼저 벡터 스토어에 문서가 있어야 합니다. Spring AI의 VectorStore 인터페이스를 사용해 문서를 저장합니다:
// Store documents with metadata
Document doc = new Document(
"Product A is a lightweight laptop ideal for travel",
Map.of("category", "electronics",
"productId", "PROD-001"));
vectorStore.add(List.of(doc));
기본 retriever는 구성된 Spring AI
VectorStore와 함께 작동합니다. PgVector가 연결되면 아래 RAG 기본값을 사용하고 벡터 스토어 연결을 별도로 구성합니다:
contexa:
rag:
defaults:
top-k: 10
similarity-threshold: 0.7
모든 RAG 속성은 AI 설정을 참조하세요.
커스텀 Pipeline 단계
PipelineStep 인터페이스를 구현해 어느 단계에든 커스텀 처리를 추가할 수 있습니다. 커스텀 단계는 Spring 빈으로 자동 발견되어 getConfigStep()과 getOrder()를 기반으로 pipeline에 삽입됩니다.
@Component
public class InputNormalizationStep implements PipelineStep {
@Override
public <T extends DomainContext> Mono<Object> execute(
AIRequest<T> request,
PipelineExecutionContext context) {
// Normalize whitespace, trim input, add metadata
String normalized = request.getContext()
.getDomainType().trim().toUpperCase();
context.addMetadata("normalizedType", normalized);
context.addMetadata("processedAt",
Instant.now().toString());
return Mono.just(request);
}
@Override
public PipelineConfiguration.PipelineStep getConfigStep() {
// Runs during the PREPROCESSING phase
return PipelineConfiguration.PipelineStep.PREPROCESSING;
}
@Override
public int getOrder() {
// Lower order = runs first within the phase
// Default steps run at order 100
return 50;
}
}
여러 커스텀 단계가 동일한 pipeline 단계를 대상으로 할 수 있습니다. getOrder() 순서로 실행되며, 낮은 값이 먼저 실행됩니다.
커스텀 응답 프로세서
POSTPROCESSING 단계에서 파싱된 응답을 변환·보강하려면 DomainResponseProcessor를 구현합니다:
@Component
public class ProductResponseProcessor
implements DomainResponseProcessor {
@Override
public boolean supports(String templateKey) {
return "PRODUCT_RECOMMENDATION".equals(templateKey);
}
@Override
public boolean supportsType(Class<?> responseType) {
return ProductRecommendationResponse.class
.isAssignableFrom(responseType);
}
@Override
public Object wrapResponse(
Object parsedData,
PipelineExecutionContext context) {
// Enrich the parsed response with execution metadata
if (parsedData instanceof ProductRecommendationResponse resp) {
resp.withMetadata("pipelineExecutionTime",
context.getExecutionTime());
resp.withMetadata("modelUsed",
context.getMetadata("modelName", String.class));
}
return parsedData;
}
}
설정
주요 pipeline·RAG 설정 노브:
| 속성 | 설명 | 기본값 |
|---|---|---|
PipelineConfiguration.timeoutSeconds(...) | 코드에서 구성하는 요청별 pipeline 타임아웃 | 300 |
contexa.streaming.timeout | 스트리밍 출력 조립에 허용되는 최대 시간 | PT5M |
contexa.rag.defaults.top-k | 검색할 유사 문서의 기본 개수 | 10 |
contexa.rag.defaults.similarity-threshold | 검색에 적용되는 기본 유사도 임계값 | 0.7 |
모든 pipeline·RAG·스트리밍 속성은 AI 설정을 참조하세요.
API 레퍼런스
PipelineOrchestrator
public class PipelineOrchestrator
생성자: public PipelineOrchestrator(List<PipelineExecutor> executors)
PipelineConfiguration
public class PipelineConfiguration
필드
| 속성 | 타입 | 기본값 | 설명 |
|---|---|---|---|
steps | List<PipelineStep> | 비어있음 | Pipeline 단계의 정렬된 목록. |
parameters | Map<String, Object> | 비어있음 | 단계 구성용 키-값 파라미터. |
timeoutSeconds | int | 300 | 최대 실행 시간. |
enableStreaming | boolean | false | 스트리밍 실행기 사용 여부. |
name | String | null | 선택적 pipeline 이름. |
description | String | null | 선택적 설명. |
Builder 메서드
PipelineStep Enum
| 값 | 설명 |
|---|---|
CONTEXT_RETRIEVAL | 벡터 스토어로부터 RAG 컨텍스트 검색. |
PREPROCESSING | 요청 검증, 정규화, 보강. |
PROMPT_GENERATION | 템플릿과 컨텍스트로부터 최종 프롬프트 구성. |
LLM_EXECUTION | UnifiedLLMOrchestrator를 통해 LLM에 프롬프트 전송. |
SOAR_TOOL_EXECUTION | 원시 LLM 결과와 응답 파싱 사이의 선택적 tool 실행 단계. |
RESPONSE_PARSING | LLM 출력을 타입 응답으로 파싱. |
POSTPROCESSING | 최종 보강·검증·도메인 처리. |
팩토리 메서드
PipelineStep 인터페이스 (커스텀 단계)
public interface PipelineStep
true.PipelineExecutor 인터페이스
public interface PipelineExecutor
PipelineOrchestrator가 실행기를 선택할 때 사용합니다.false.내장: UniversalPipelineExecutor(표준)와 StreamingUniversalPipelineExecutor(스트리밍).
ContextRetriever
public class ContextRetriever
생성자:
public ContextRetriever(VectorStore vectorStore, ContexaRagProperties ragProperties)—VectorStore를 기본VectorOperations파사드로 감쌉니다.public ContextRetriever(VectorOperations vectorOperations, ContexaRagProperties ragProperties)— 미리 구성된 파사드를 직접 받습니다.
서브클래스는 vectorStore 필드가 아닌 protected vectorOperations 필드를 통해 벡터 스토어에 접근합니다. vectorOperations.searchSimilar(...), storeDocuments(...) 같은 메서드를 사용합니다.
"naturalLanguageQuery"를 읽고, 값이 없으면 context.toString()으로 폴백합니다. 자체 도메인 필드로 쿼리를 유도하려면 오버라이드합니다.ContextRetrievalResult (ContextRetriever의 nested class)
생성자: ContextRetrievalResult(String contextInfo, List<Document> documents, Map<String, Object> metadata)
getContextInfo()— 프롬프트 템플릿에 전달되는 형식화된 컨텍스트 문자열.getDocuments()— 원시 일치 문서.getMetadata()— 검색 메타데이터. 기본 구현은documentsFound와searchQuery키를 채웁니다.
DomainResponseProcessor 인터페이스
public interface DomainResponseProcessor