contexa-core

Pipeline 및 RAG

AI Pipeline은 RAG로 관련 컨텍스트 조회부터 구조화된 LLM 응답 파싱까지 설정 가능한 단계 시퀀스를 통해 pipeline 기반 AI 요청을 처리합니다. PipelineOrchestrator가 적절한 실행기를 선택하고 전체 수명 주기를 관리합니다.

개요: 기본 6단계 Pipeline

Lab이 orchestrator.execute() 또는 orchestrator.executeStream()을 호출하면 기본 구성은 6개 처리 단계를 거칩니다. 런타임은 또한 pipeline이 명시적으로 추가할 때 LLM_EXECUTIONRESPONSE_PARSING 사이에 삽입할 수 있는 선택적 SOAR_TOOL_EXECUTION 단계를 정의합니다.

CONTEXT_RETRIEVAL
PREPROCESSING
PROMPT_GENERATION
LLM_EXECUTION
RESPONSE_PARSING
POSTPROCESSING

각 단계는 PipelineStep 빈으로 구현됩니다. 오케스트레이터가 PipelineExecutor(표준 또는 스트리밍)를 선택하고 구성된 단계를 순서대로 실행합니다. createPipelineConfig()는 기본적으로 선택적 SOAR 단계를 포함하지 않지만, enum과 실행기 모두 이를 지원합니다.

단계별 상세 분석

1. CONTEXT_RETRIEVAL

벡터 스토어에서 관련 컨텍스트를 조회해 도메인별 지식으로 LLM 프롬프트를 보강합니다.

  • 컴포넌트: ContextRetriever(또는 사용자의 커스텀 하위 클래스)
  • 입력: 도메인 컨텍스트와 선택적 자연어 질의를 가진 AIRequest
  • 출력: 조회된 컨텍스트 문자열, 일치 문서, 메타데이터를 담은 ContextRetrievalResult
  • 동작: retriever가 요청에서 검색 질의를 추출해 벡터 스토어에 유사도 검색을 수행하고, 일치 문서를 컨텍스트 문자열로 형식화합니다. 이 문자열은 프롬프트 템플릿에 contextInfo로 전달됩니다.

ContextRetrieverRegistry는 현재 컨텍스트 타입에 대한 retriever를 해결하고, 특정 등록이 없을 때 기본 retriever로 폴백합니다. 검색이 일치 결과를 반환하지 못하면 다운스트림 프롬프트는 빈 또는 약한 신호 컨텍스트 문자열로 그대로 실행됩니다.

2. PREPROCESSING

프롬프트 생성 전에 기본 시스템 메타데이터를 수집합니다.

  • 동작: 내장 PreprocessingStep이 요청 ID, 요청 타입, 컨텍스트 타입, 처리 시간을 담은 형식화된 메타데이터 문자열을 저장합니다. 커스텀 단계는 동일한 PREPROCESSING 단계를 대상으로 검증·정규화 로직을 추가할 수 있습니다.
  • 커스터마이징: PipelineStep 인터페이스를 구현해 자체 전처리 로직을 추가할 수 있습니다 (아래 커스텀 Pipeline 단계 참조).

3. PROMPT_GENERATION

도메인 시스템 프롬프트, 사용자 프롬프트, 조회된 컨텍스트로 최종 프롬프트를 구성합니다.

  • 컴포넌트: 사용자의 PromptTemplate 구현 (TemplateType으로 매칭)
  • 동작: 프레임워크가 일치하는 템플릿의 generateSystemPrompt()generateUserPrompt()를 호출합니다. 표준 템플릿은 JSON 형식 지시문이 시스템 프롬프트에 자동 추가되며, 스트리밍 템플릿은 스트리밍 프로토콜 지시문과 JSON 스키마 예제가 주입됩니다.
  • 결과: 시스템·사용자 메시지를 가진 Spring AI Prompt 객체로, LLM 실행 준비 완료 상태입니다.

4. LLM_EXECUTION

구성된 프롬프트를 UnifiedLLMOrchestrator를 통해 LLM에 전송합니다.

  • 컴포넌트: UnifiedLLMOrchestrator
  • 동작: 오케스트레이터가 요청의 tier 또는 분석 수준을 기반으로 모델을 선택하고, 등록된 Advisor를 적용하며, 프롬프트를 LLM에 전송합니다. 표준 모드에서는 전체 텍스트 응답을 반환하고, 스트리밍 모드에서는 청크의 Flux<String>을 반환합니다.
  • 설정: 모델 선택과 tier 타임아웃은 spring.ai.security·spring.ai.security.tiered로 구성합니다. 스트리밍과 검색 동작은 contexa.streaming·contexa.rag로 별도 설정합니다. LLM 및 모델을 참조하세요.

5. SOAR_TOOL_EXECUTION (선택)

Pipeline 구성에 SOAR 단계가 포함될 때 응답 파싱 전 LLM 후 tool 기반 액션을 실행합니다.

  • 구성 단계: PipelineConfiguration.PipelineStep.SOAR_TOOL_EXECUTION
  • 동작: 표준 실행기는 LLM 결과를 파싱 전에 tool 실행 단계로 전달할 수 있습니다. ResponseParsingStep이 이 단계를 먼저 확인하고, SOAR 단계가 없으면 원시 LLM_EXECUTION 결과로 폴백합니다.
  • 기본 동작: 내장 기본 pipeline 구성은 이 단계를 자동 추가하지 않습니다.

6. RESPONSE_PARSING

원시 LLM 텍스트 출력을 타입이 지정된 응답 객체로 파싱합니다.

  • 동작: 프레임워크가 LLM 응답에서 JSON을 추출하고(마크다운 코드 블록·여분 텍스트 등 처리), BeanOutputConverter를 사용해 AIResponse 하위 클래스로 역직렬화합니다. JSON 추출에 실패하면 폴백 파서가 괄호 균형 맞춤·복구를 시도합니다.
  • 스트리밍 참고: 이 단계는 스트리밍 모드에서 사용되지 않습니다. 대신 JsonStreamingProcessor가 프로토콜 마커를 사용해 스트림에서 JSON 추출을 처리합니다.

7. POSTPROCESSING

파싱된 응답에 대한 최종 보강·검증·도메인별 처리.

  • 컴포넌트: DomainResponseProcessor (선택, 커스텀)
  • 동작: 요청의 템플릿 키를 지원하는 DomainResponseProcessor가 등록되어 있다면, 그 wrapResponse() 메서드가 호출되어 파싱된 응답을 변환·보강합니다. 실행 메타데이터(타이밍·요청 ID)도 첨부됩니다.
  • 스트리밍 참고: 이 단계는 스트리밍 모드에서 사용되지 않습니다.

표준 vs 스트리밍 Pipeline

Pipeline은 PipelineConfiguration에 따라 두 가지 모드로 동작합니다:

측면표준 Pipeline스트리밍 Pipeline
단계기본 6단계, 명시적 구성 시 선택적 SOAR_TOOL_EXECUTION 포함처음 4단계 (LLM_EXECUTION까지)
실행기UniversalPipelineExecutorStreamingUniversalPipelineExecutor
반환 타입Mono<R> (타입 응답)Flux<String> (청크 스트림)
JSON 처리RESPONSE_PARSING의 BeanOutputConverter프로토콜 마커가 있는 JsonStreamingProcessor
구성 플래그enableStreaming(false) (기본값)enableStreaming(true)
팩토리 메서드PipelineConfiguration.createPipelineConfig()PipelineConfiguration.createStreamPipelineConfig()

표준 Pipeline 구성

Java
// Default 6-step pipeline, no streaming
PipelineConfiguration config = PipelineConfiguration.builder()
    .addStep(PipelineConfiguration.PipelineStep.CONTEXT_RETRIEVAL)
    .addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
    .addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
    .addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
    .addStep(PipelineConfiguration.PipelineStep.RESPONSE_PARSING)
    .addStep(PipelineConfiguration.PipelineStep.POSTPROCESSING)
    .timeoutSeconds(300)
    .build();

Mono<MyResponse> result = orchestrator.execute(
    aiRequest, config, MyResponse.class);

스트리밍 Pipeline 구성

Java
// First 4 steps, streaming enabled
PipelineConfiguration config = PipelineConfiguration.builder()
    .addStep(PipelineConfiguration.PipelineStep.CONTEXT_RETRIEVAL)
    .addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
    .addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
    .addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
    .enableStreaming(true)
    .timeoutSeconds(300)
    .build();

Flux<String> chunks = orchestrator.executeStream(
    aiRequest, config);

내장 팩토리 메서드를 사용할 수도 있습니다:

Java
// Equivalent to the manual configurations above
PipelineConfiguration.createPipelineConfig();       // standard
PipelineConfiguration.createStreamPipelineConfig();  // streaming

선택적 SOAR 단계가 필요하면 구성을 수동으로 빌드하고 LLM_EXECUTIONRESPONSE_PARSING 사이에 PipelineConfiguration.PipelineStep.SOAR_TOOL_EXECUTION을 추가합니다.

RAG 통합 가이드

RAG(Retrieval-Augmented Generation)는 데이터에서 관련 컨텍스트를 제공하여 LLM 응답을 향상시킵니다. ContextRetriever가 사용 가능할 때 CONTEXT_RETRIEVAL 단계가 이를 자동 처리합니다.

Pipeline에서 RAG 동작 방식

  1. CONTEXT_RETRIEVAL 단계가 사용자의 ContextRetriever.retrieveContext()를 호출합니다.
  2. retriever가 요청 질의와 유사한 문서를 벡터 스토어에서 검색합니다.
  3. 일치 문서가 컨텍스트 문자열로 형식화됩니다.
  4. 컨텍스트 문자열이 PROMPT_GENERATION 단계의 generateUserPrompt(request, contextInfo)에 전달됩니다.
  5. 프롬프트 템플릿이 LLM에 전송되는 사용자 프롬프트에 이 컨텍스트를 포함합니다.

ContextRetriever 구현

Java
@Component
public class MyContextRetriever extends ContextRetriever {

    public MyContextRetriever(
            VectorStore vectorStore,
            ContexaRagProperties ragProperties) {
        super(vectorStore, ragProperties);
    }

    @Override
    public ContextRetrievalResult retrieveContext(
            AIRequest<? extends DomainContext> request) {
        // Extract search query from your domain context
        String query = extractQueryFromRequest(request);

        // Search the vector store via the VectorOperations facade
        List<Document> docs = vectorOperations
            .searchSimilar(query);

        // Format results as context string
        String contextInfo = docs.stream()
            .map(doc -> doc.getText())
            .collect(Collectors.joining("\n\n"));

        return new ContextRetrievalResult(
            contextInfo, docs,
            Map.of("retrieverType", "custom",
                    "docCount", docs.size()));
    }
}

RAG용 문서 저장

검색이 작동하려면 먼저 벡터 스토어에 문서가 있어야 합니다. Spring AI의 VectorStore 인터페이스를 사용해 문서를 저장합니다:

Java
// Store documents with metadata
Document doc = new Document(
    "Product A is a lightweight laptop ideal for travel",
    Map.of("category", "electronics",
           "productId", "PROD-001"));

vectorStore.add(List.of(doc));
벡터 스토어 설정
기본 retriever는 구성된 Spring AI VectorStore와 함께 작동합니다. PgVector가 연결되면 아래 RAG 기본값을 사용하고 벡터 스토어 연결을 별도로 구성합니다:
contexa:
  rag:
    defaults:
      top-k: 10
      similarity-threshold: 0.7
모든 RAG 속성은 AI 설정을 참조하세요.

커스텀 Pipeline 단계

PipelineStep 인터페이스를 구현해 어느 단계에든 커스텀 처리를 추가할 수 있습니다. 커스텀 단계는 Spring 빈으로 자동 발견되어 getConfigStep()getOrder()를 기반으로 pipeline에 삽입됩니다.

Java
@Component
public class InputNormalizationStep implements PipelineStep {

    @Override
    public <T extends DomainContext> Mono<Object> execute(
            AIRequest<T> request,
            PipelineExecutionContext context) {
        // Normalize whitespace, trim input, add metadata
        String normalized = request.getContext()
            .getDomainType().trim().toUpperCase();
        context.addMetadata("normalizedType", normalized);
        context.addMetadata("processedAt",
            Instant.now().toString());
        return Mono.just(request);
    }

    @Override
    public PipelineConfiguration.PipelineStep getConfigStep() {
        // Runs during the PREPROCESSING phase
        return PipelineConfiguration.PipelineStep.PREPROCESSING;
    }

    @Override
    public int getOrder() {
        // Lower order = runs first within the phase
        // Default steps run at order 100
        return 50;
    }
}

여러 커스텀 단계가 동일한 pipeline 단계를 대상으로 할 수 있습니다. getOrder() 순서로 실행되며, 낮은 값이 먼저 실행됩니다.

커스텀 응답 프로세서

POSTPROCESSING 단계에서 파싱된 응답을 변환·보강하려면 DomainResponseProcessor를 구현합니다:

Java
@Component
public class ProductResponseProcessor
        implements DomainResponseProcessor {

    @Override
    public boolean supports(String templateKey) {
        return "PRODUCT_RECOMMENDATION".equals(templateKey);
    }

    @Override
    public boolean supportsType(Class<?> responseType) {
        return ProductRecommendationResponse.class
            .isAssignableFrom(responseType);
    }

    @Override
    public Object wrapResponse(
            Object parsedData,
            PipelineExecutionContext context) {
        // Enrich the parsed response with execution metadata
        if (parsedData instanceof ProductRecommendationResponse resp) {
            resp.withMetadata("pipelineExecutionTime",
                context.getExecutionTime());
            resp.withMetadata("modelUsed",
                context.getMetadata("modelName", String.class));
        }
        return parsedData;
    }
}

설정

주요 pipeline·RAG 설정 노브:

속성설명기본값
PipelineConfiguration.timeoutSeconds(...)코드에서 구성하는 요청별 pipeline 타임아웃300
contexa.streaming.timeout스트리밍 출력 조립에 허용되는 최대 시간PT5M
contexa.rag.defaults.top-k검색할 유사 문서의 기본 개수10
contexa.rag.defaults.similarity-threshold검색에 적용되는 기본 유사도 임계값0.7
전체 설정 레퍼런스
모든 pipeline·RAG·스트리밍 속성은 AI 설정을 참조하세요.

API 레퍼런스

PipelineOrchestrator
public class PipelineOrchestrator

생성자: public PipelineOrchestrator(List<PipelineExecutor> executors)

execute(AIRequest<T> request, Class<R> responseType) Mono<R>
기본 구성으로 pipeline을 실행합니다.
execute(AIRequest<T> request, PipelineConfiguration config, Class<R> responseType) Mono<R>
커스텀 구성으로 실행합니다. 실패 시 자동 폴백 응답 포함.
executeStream(AIRequest<T> request) Flux<String>
기본 스트리밍 구성으로 스트리밍 실행.
executeStream(AIRequest<T> request, PipelineConfiguration config) Flux<String>
커스텀 구성으로 스트리밍 실행.
PipelineConfiguration
public class PipelineConfiguration

필드

속성타입기본값설명
stepsList<PipelineStep>비어있음Pipeline 단계의 정렬된 목록.
parametersMap<String, Object>비어있음단계 구성용 키-값 파라미터.
timeoutSecondsint300최대 실행 시간.
enableStreamingbooleanfalse스트리밍 실행기 사용 여부.
nameStringnull선택적 pipeline 이름.
descriptionStringnull선택적 설명.

Builder 메서드

builder().addStep(PipelineStep step) Builder
구성에 단계를 추가합니다.
builder().addParameter(String key, Object value) Builder
파라미터를 추가합니다.
builder().timeoutSeconds(int timeout) Builder
타임아웃을 설정합니다.
builder().enableStreaming(boolean enable) Builder
스트리밍 모드를 활성화·비활성화합니다.

PipelineStep Enum

설명
CONTEXT_RETRIEVAL벡터 스토어로부터 RAG 컨텍스트 검색.
PREPROCESSING요청 검증, 정규화, 보강.
PROMPT_GENERATION템플릿과 컨텍스트로부터 최종 프롬프트 구성.
LLM_EXECUTIONUnifiedLLMOrchestrator를 통해 LLM에 프롬프트 전송.
SOAR_TOOL_EXECUTION원시 LLM 결과와 응답 파싱 사이의 선택적 tool 실행 단계.
RESPONSE_PARSINGLLM 출력을 타입 응답으로 파싱.
POSTPROCESSING최종 보강·검증·도메인 처리.

팩토리 메서드

createPipelineConfig() PipelineConfiguration
선택적 SOAR 단계 없이 6단계로 구성된 기본 config. 타임아웃 300초.
createStreamPipelineConfig() PipelineConfiguration
처음 4단계로 구성된 스트리밍 config. 스트리밍 활성화, 타임아웃 300초.
PipelineStep 인터페이스 (커스텀 단계)
public interface PipelineStep
execute(AIRequest<T> request, PipelineExecutionContext context) Mono<Object>
이 단계의 로직을 실행합니다.
getConfigStep() PipelineConfiguration.PipelineStep
이 단계가 속하는 pipeline 단계를 반환합니다.
getOrder() int
단계 내 실행 순서. 기본값: 100. 낮은 값이 먼저 실행됩니다.
canExecute(AIRequest<T> request) boolean
주어진 요청에 대해 이 단계가 실행되어야 하는지 여부. 기본값: true.
PipelineExecutor 인터페이스
public interface PipelineExecutor
execute(AIRequest<T> request, PipelineConfiguration config, Class<R> responseType) Mono<R>
Pipeline 단계를 실행하고 타입이 지정된 응답을 반환합니다.
executeStream(AIRequest<T> request, PipelineConfiguration config) Flux<String>
스트리밍 실행.
getSupportedDomain() String
이 실행기가 전담하는 도메인을 반환합니다.
supportsConfiguration(PipelineConfiguration config) boolean
주어진 구성을 이 실행기가 처리할 수 있는지 여부. PipelineOrchestrator가 실행기를 선택할 때 사용합니다.
getPriority() int
지원 실행기 중 선택 우선순위. 기본값 100. 낮은 값이 먼저 검토됩니다.
supportsStreaming() boolean
이 실행기가 스트리밍을 지원하는지 여부. 기본값 false.

내장: UniversalPipelineExecutor(표준)와 StreamingUniversalPipelineExecutor(스트리밍).

ContextRetriever
public class ContextRetriever

생성자:

  • public ContextRetriever(VectorStore vectorStore, ContexaRagProperties ragProperties)VectorStore를 기본 VectorOperations 파사드로 감쌉니다.
  • public ContextRetriever(VectorOperations vectorOperations, ContexaRagProperties ragProperties) — 미리 구성된 파사드를 직접 받습니다.

서브클래스는 vectorStore 필드가 아닌 protected vectorOperations 필드를 통해 벡터 스토어에 접근합니다. vectorOperations.searchSimilar(...), storeDocuments(...) 같은 메서드를 사용합니다.

retrieveContext(AIRequest<? extends DomainContext> request) ContextRetrievalResult
요청을 기반으로 벡터 스토어에서 컨텍스트를 검색합니다.
extractQueryFromRequest(AIRequest<? extends DomainContext> request) String
기본 구현은 요청 파라미터 "naturalLanguageQuery"를 읽고, 값이 없으면 context.toString()으로 폴백합니다. 자체 도메인 필드로 쿼리를 유도하려면 오버라이드합니다.

ContextRetrievalResult (ContextRetriever의 nested class)

생성자: ContextRetrievalResult(String contextInfo, List<Document> documents, Map<String, Object> metadata)

  • getContextInfo() — 프롬프트 템플릿에 전달되는 형식화된 컨텍스트 문자열.
  • getDocuments() — 원시 일치 문서.
  • getMetadata() — 검색 메타데이터. 기본 구현은 documentsFoundsearchQuery 키를 채웁니다.
DomainResponseProcessor 인터페이스
public interface DomainResponseProcessor
supports(String templateKey) boolean
이 프로세서가 주어진 템플릿 키를 처리하는지 여부.
supportsType(Class<?> responseType) boolean
이 프로세서가 주어진 응답 타입을 처리하는지 여부.
wrapResponse(Object parsedData, PipelineExecutionContext context) Object
파싱된 응답을 변환·보강합니다.
getOrder() int
처리 순서. 기본값은 인터페이스에서 제공.