contexa-core

Pipeline & RAG

The AI Pipeline processes pipeline-backed AI requests through a configurable sequence of steps — from retrieving relevant context via RAG to parsing the structured LLM response. The PipelineOrchestrator selects the appropriate executor and manages the full lifecycle.

Overview: Default 6-Step Pipeline

When your Lab calls orchestrator.execute() or orchestrator.executeStream(), the default configuration flows through 6 processing steps. The runtime also defines an optional SOAR_TOOL_EXECUTION phase that can be inserted between LLM_EXECUTION and RESPONSE_PARSING when a pipeline explicitly adds it.

CONTEXT_RETRIEVAL
PREPROCESSING
PROMPT_GENERATION
LLM_EXECUTION
RESPONSE_PARSING
POSTPROCESSING

Each step is implemented as a PipelineStep bean. The orchestrator selects a PipelineExecutor (standard or streaming) and runs the configured steps in sequence. createPipelineConfig() does not include the optional SOAR phase by default, but the enum and executor support it.

Step-by-Step Deep Dive

1. CONTEXT_RETRIEVAL

Retrieves relevant context from a vector store to augment the LLM prompt with domain-specific knowledge.

  • Component: ContextRetriever (or your custom subclass)
  • Input: The AIRequest with domain context and optional natural language query
  • Output: ContextRetrievalResult containing the retrieved context string, matching documents, and metadata
  • What happens: The retriever extracts a search query from the request, performs a similarity search against the vector store, and formats the matching documents into a context string. This string is passed to the prompt template as contextInfo.

The ContextRetrieverRegistry resolves the retriever for the current context type and falls back to the default retriever when no specific registration exists. If retrieval returns no matches, the downstream prompt still runs with an empty or low-signal context string.

2. PREPROCESSING

Collects baseline system metadata before prompt generation.

  • What happens: The built-in PreprocessingStep stores a formatted metadata string containing request ID, request type, context type, and processing time. Custom steps can still add validation or normalization logic by targeting the same PREPROCESSING phase.
  • Customizable: Implement the PipelineStep interface to add your own preprocessing logic (see Custom Pipeline Steps below).

3. PROMPT_GENERATION

Constructs the final prompt from the domain system prompt, user prompt, and retrieved context.

  • Component: Your PromptTemplate implementation (matched by TemplateType)
  • What happens: The framework calls generateSystemPrompt() and generateUserPrompt() on the matching template. For standard templates, JSON format instructions are automatically appended to the system prompt. For streaming templates, the streaming protocol instructions and JSON schema example are injected.
  • Result: A Spring AI Prompt object with system and user messages, ready for LLM execution.

4. LLM_EXECUTION

Sends the constructed prompt to the LLM via UnifiedLLMOrchestrator.

  • Component: UnifiedLLMOrchestrator
  • What happens: The orchestrator selects the model based on the request's tier or analysis level, applies registered Advisors, and sends the prompt to the LLM. In standard mode, it returns the full text response. In streaming mode, it returns a Flux<String> of chunks.
  • Configuration: Model selection and tier timeouts are configured through spring.ai.security and spring.ai.security.tiered. Streaming and retrieval behavior are configured separately through contexa.streaming and contexa.rag. See LLM & Models.

5. SOAR_TOOL_EXECUTION (Optional)

Executes tool-driven post-LLM actions before response parsing when the pipeline configuration includes the SOAR phase.

  • Config step: PipelineConfiguration.PipelineStep.SOAR_TOOL_EXECUTION
  • What happens: The standard executor can forward the LLM result into a tool-execution phase before parsing. ResponseParsingStep checks this phase first and falls back to the raw LLM_EXECUTION result when no SOAR phase is present.
  • Default behavior: The built-in default pipeline configuration does not add this step automatically.

6. RESPONSE_PARSING

Parses the raw LLM text output into your typed response object.

  • What happens: The framework extracts JSON from the LLM response (handling markdown code blocks, extra text, etc.), then deserializes it into your AIResponse subclass using BeanOutputConverter. If JSON extraction fails, a fallback parser attempts bracket balancing and repair.
  • Streaming note: This step is not used in streaming mode. Instead, the JsonStreamingProcessor handles JSON extraction from the stream using protocol markers.

7. POSTPROCESSING

Final enrichment, validation, and domain-specific processing of the parsed response.

  • Component: DomainResponseProcessor (optional, custom)
  • What happens: If a DomainResponseProcessor is registered that supports the request's template key, its wrapResponse() method is called to transform or enrich the parsed response. Execution metadata (timing, request ID) is also attached.
  • Streaming note: This step is not used in streaming mode.

Standard vs Streaming Pipeline

The pipeline operates in two modes depending on the PipelineConfiguration:

AspectStandard PipelineStreaming Pipeline
StepsDefault 6 steps, with optional SOAR_TOOL_EXECUTION if explicitly configuredFirst 4 steps (through LLM_EXECUTION)
ExecutorUniversalPipelineExecutorStreamingUniversalPipelineExecutor
Return typeMono<R> (typed response)Flux<String> (chunk stream)
JSON handlingBeanOutputConverter in RESPONSE_PARSINGJsonStreamingProcessor with protocol markers
Config flagenableStreaming(false) (default)enableStreaming(true)
Factory methodPipelineConfiguration.createPipelineConfig()PipelineConfiguration.createStreamPipelineConfig()

Standard Pipeline Configuration

Java
// Default 6-step pipeline, no streaming
PipelineConfiguration config = PipelineConfiguration.builder()
    .addStep(PipelineConfiguration.PipelineStep.CONTEXT_RETRIEVAL)
    .addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
    .addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
    .addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
    .addStep(PipelineConfiguration.PipelineStep.RESPONSE_PARSING)
    .addStep(PipelineConfiguration.PipelineStep.POSTPROCESSING)
    .timeoutSeconds(300)
    .build();

Mono<MyResponse> result = orchestrator.execute(
    aiRequest, config, MyResponse.class);

Streaming Pipeline Configuration

Java
// First 4 steps, streaming enabled
PipelineConfiguration config = PipelineConfiguration.builder()
    .addStep(PipelineConfiguration.PipelineStep.CONTEXT_RETRIEVAL)
    .addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
    .addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
    .addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
    .enableStreaming(true)
    .timeoutSeconds(300)
    .build();

Flux<String> chunks = orchestrator.executeStream(
    aiRequest, config);

You can also use the built-in factory methods:

Java
// Equivalent to the manual configurations above
PipelineConfiguration.createPipelineConfig();       // standard
PipelineConfiguration.createStreamPipelineConfig();  // streaming

If you need the optional SOAR phase, build the configuration manually and add PipelineConfiguration.PipelineStep.SOAR_TOOL_EXECUTION between LLM_EXECUTION and RESPONSE_PARSING.

RAG Integration Guide

RAG (Retrieval-Augmented Generation) enhances LLM responses by providing relevant context from your data. The CONTEXT_RETRIEVAL step handles this automatically when a ContextRetriever is available.

How RAG Works in the Pipeline

  1. The CONTEXT_RETRIEVAL step calls your ContextRetriever.retrieveContext()
  2. The retriever searches the vector store for documents similar to the request query
  3. Matching documents are formatted into a context string
  4. The context string is passed to generateUserPrompt(request, contextInfo) in the PROMPT_GENERATION step
  5. Your prompt template includes this context in the user prompt sent to the LLM

Implementing a ContextRetriever

Java
@Component
public class MyContextRetriever extends ContextRetriever {

    public MyContextRetriever(
            VectorStore vectorStore,
            ContexaRagProperties ragProperties) {
        super(vectorStore, ragProperties);
    }

    @Override
    public ContextRetrievalResult retrieveContext(
            AIRequest<? extends DomainContext> request) {
        // Extract search query from your domain context
        String query = extractQueryFromRequest(request);

        // Search the vector store
        List<Document> docs = vectorStore
            .similaritySearch(query);

        // Format results as context string
        String contextInfo = docs.stream()
            .map(doc -> doc.getText())
            .collect(Collectors.joining("\n\n"));

        return new ContextRetrievalResult(
            contextInfo, docs,
            Map.of("retrieverType", "custom",
                    "docCount", docs.size()));
    }
}

Storing Documents for RAG

Before retrieval works, you need documents in your vector store. Use Spring AI's VectorStore interface to store documents:

Java
// Store documents with metadata
Document doc = new Document(
    "Product A is a lightweight laptop ideal for travel",
    Map.of("category", "electronics",
           "productId", "PROD-001"));

vectorStore.add(List.of(doc));
Vector Store Setup
The default retriever works with the configured Spring AI VectorStore. When PgVector is wired, use the RAG defaults below and configure the vector store connection separately:
contexa:
  rag:
    defaults:
      top-k: 10
      similarity-threshold: 0.7
See AI Configuration for all RAG properties.

Custom Pipeline Steps

You can add custom processing at any stage by implementing the PipelineStep interface. Custom steps are auto-discovered as Spring beans and inserted into the pipeline based on their getConfigStep() and getOrder().

Java
@Component
public class InputNormalizationStep implements PipelineStep {

    @Override
    public <T extends DomainContext> Mono<Object> execute(
            AIRequest<T> request,
            PipelineExecutionContext context) {
        // Normalize whitespace, trim input, add metadata
        String normalized = request.getContext()
            .getDomainType().trim().toUpperCase();
        context.addMetadata("normalizedType", normalized);
        context.addMetadata("processedAt",
            Instant.now().toString());
        return Mono.just(request);
    }

    @Override
    public PipelineConfiguration.PipelineStep getConfigStep() {
        // Runs during the PREPROCESSING phase
        return PipelineConfiguration.PipelineStep.PREPROCESSING;
    }

    @Override
    public int getOrder() {
        // Lower order = runs first within the phase
        // Default steps run at order 100
        return 50;
    }
}

Multiple custom steps can target the same pipeline phase. They execute in getOrder() order, with lower values running first.

Custom Response Processor

Implement DomainResponseProcessor to transform or enrich the parsed response during the POSTPROCESSING step:

Java
@Component
public class ProductResponseProcessor
        implements DomainResponseProcessor {

    @Override
    public boolean supports(String templateKey) {
        return "PRODUCT_RECOMMENDATION".equals(templateKey);
    }

    @Override
    public boolean supportsType(Class<?> responseType) {
        return ProductRecommendationResponse.class
            .isAssignableFrom(responseType);
    }

    @Override
    public Object wrapResponse(
            Object parsedData,
            PipelineExecutionContext context) {
        // Enrich the parsed response with execution metadata
        if (parsedData instanceof ProductRecommendationResponse resp) {
            resp.withMetadata("pipelineExecutionTime",
                context.getElapsedTimeMs());
            resp.withMetadata("modelUsed",
                context.getMetadata("modelName"));
        }
        return parsedData;
    }
}

Configuration

Key pipeline and RAG knobs:

PropertyDescriptionDefault
PipelineConfiguration.timeoutSeconds(...)Per-request pipeline timeout configured in code300
contexa.streaming.timeoutMaximum duration allowed for streaming output assemblyPT5M
contexa.rag.defaults.top-kDefault number of similar documents to retrieve10
contexa.rag.defaults.similarity-thresholdDefault similarity threshold applied to retrieval0.7
Full Configuration Reference
See AI Configuration for all pipeline, RAG, and streaming properties.

API Reference

PipelineOrchestrator
public class PipelineOrchestrator

Constructor: public PipelineOrchestrator(List<PipelineExecutor> executors)

execute(AIRequest<T> request, Class<R> responseType) Mono<R>
Executes the pipeline with the default configuration.
execute(AIRequest<T> request, PipelineConfiguration config, Class<R> responseType) Mono<R>
Executes with a custom configuration. Includes automatic fallback response on failure.
executeStream(AIRequest<T> request) Flux<String>
Streaming execution with default streaming configuration.
executeStream(AIRequest<T> request, PipelineConfiguration config) Flux<String>
Streaming execution with custom configuration.
PipelineConfiguration
public class PipelineConfiguration

Fields

PropertyTypeDefaultDescription
stepsList<PipelineStep>EmptyOrdered list of pipeline steps.
parametersMap<String, Object>EmptyKey-value parameters for step configuration.
timeoutSecondsint300Maximum execution time.
enableStreamingbooleanfalseWhether to use streaming executor.
nameStringnullOptional pipeline name.
descriptionStringnullOptional description.

Builder Methods

builder().addStep(PipelineStep step) Builder
Adds a step to the configuration.
builder().addParameter(String key, Object value) Builder
Adds a parameter.
builder().timeoutSeconds(int timeout) Builder
Sets the timeout.
builder().enableStreaming(boolean enable) Builder
Enables or disables streaming mode.

PipelineStep Enum

ValueDescription
CONTEXT_RETRIEVALRAG context retrieval from vector stores.
PREPROCESSINGRequest validation, normalization, and enrichment.
PROMPT_GENERATIONConstructs the final prompt from templates and context.
LLM_EXECUTIONSends the prompt to the LLM via UnifiedLLMOrchestrator.
SOAR_TOOL_EXECUTIONOptional tool-execution phase between the raw LLM result and response parsing.
RESPONSE_PARSINGParses the LLM output into the typed response.
POSTPROCESSINGFinal enrichment, validation, and domain processing.

Factory Methods

createPipelineConfig() PipelineConfiguration
Default config with 6 steps (without the optional SOAR phase). Timeout: 300s.
createStreamPipelineConfig() PipelineConfiguration
Streaming config with first 4 steps. Streaming enabled, timeout: 300s.
PipelineStep Interface (custom steps)
public interface PipelineStep
execute(AIRequest<T> request, PipelineExecutionContext context) Mono<Object>
Executes this step's logic.
getConfigStep() PipelineConfiguration.PipelineStep
Returns which pipeline phase this step belongs to.
getOrder() int
Execution order within the phase. Default: 100. Lower values run first.
canExecute(AIRequest<T> request) boolean
Whether this step should execute for the given request. Default: true.
PipelineExecutor Interface
public interface PipelineExecutor
execute(AIRequest<T> request, PipelineConfiguration config, Class<R> responseType) Mono<R>
Executes pipeline steps and returns the typed response.
executeStream(AIRequest<T> request, PipelineConfiguration config) Flux<String>
Streaming execution.
supportsStreaming() boolean
Whether this executor supports streaming.

Built-in: UniversalPipelineExecutor (standard) and StreamingUniversalPipelineExecutor (streaming).

ContextRetriever
public class ContextRetriever

Constructor: public ContextRetriever(VectorStore vectorStore, ContexaRagProperties ragProperties)

retrieveContext(AIRequest<? extends DomainContext> request) ContextRetrievalResult
Retrieves context from the vector store based on the request.

ContextRetrievalResult

Constructor: ContextRetrievalResult(String contextInfo, List<Document> documents, Map<String, Object> metadata)

  • getContextInfo() — The formatted context string passed to the prompt template
  • getDocuments() — The raw matching documents
  • getMetadata() — Retrieval metadata (retriever type, timestamps, etc.)
DomainResponseProcessor Interface
public interface DomainResponseProcessor
supports(String templateKey) boolean
Whether this processor handles the given template key.
supportsType(Class<?> responseType) boolean
Whether this processor handles the given response type.
wrapResponse(Object parsedData, PipelineExecutionContext context) Object
Transforms or enriches the parsed response.
getOrder() int
Processing order. Default provided by the interface.