Pipeline & RAG
The AI Pipeline processes pipeline-backed AI requests through a configurable sequence of steps — from retrieving relevant context via RAG to parsing the structured LLM response. The PipelineOrchestrator selects the appropriate executor and manages the full lifecycle.
Overview: Default 6-Step Pipeline
When your Lab calls orchestrator.execute() or orchestrator.executeStream(), the default configuration flows through 6 processing steps. The runtime also defines an optional SOAR_TOOL_EXECUTION phase that can be inserted between LLM_EXECUTION and RESPONSE_PARSING when a pipeline explicitly adds it.
Each step is implemented as a PipelineStep bean. The orchestrator selects a PipelineExecutor (standard or streaming) and runs the configured steps in sequence. createPipelineConfig() does not include the optional SOAR phase by default, but the enum and executor support it.
Step-by-Step Deep Dive
1. CONTEXT_RETRIEVAL
Retrieves relevant context from a vector store to augment the LLM prompt with domain-specific knowledge.
- Component:
ContextRetriever(or your custom subclass) - Input: The
AIRequestwith domain context and optional natural language query - Output:
ContextRetrievalResultcontaining the retrieved context string, matching documents, and metadata - What happens: The retriever extracts a search query from the request, performs a similarity search against the vector store, and formats the matching documents into a context string. This string is passed to the prompt template as
contextInfo.
The ContextRetrieverRegistry resolves the retriever for the current context type and falls back to the default retriever when no specific registration exists. If retrieval returns no matches, the downstream prompt still runs with an empty or low-signal context string.
2. PREPROCESSING
Collects baseline system metadata before prompt generation.
- What happens: The built-in
PreprocessingStepstores a formatted metadata string containing request ID, request type, context type, and processing time. Custom steps can still add validation or normalization logic by targeting the samePREPROCESSINGphase. - Customizable: Implement the
PipelineStepinterface to add your own preprocessing logic (see Custom Pipeline Steps below).
3. PROMPT_GENERATION
Constructs the final prompt from the domain system prompt, user prompt, and retrieved context.
- Component: Your
PromptTemplateimplementation (matched byTemplateType) - What happens: The framework calls
generateSystemPrompt()andgenerateUserPrompt()on the matching template. For standard templates, JSON format instructions are automatically appended to the system prompt. For streaming templates, the streaming protocol instructions and JSON schema example are injected. - Result: A Spring AI
Promptobject with system and user messages, ready for LLM execution.
4. LLM_EXECUTION
Sends the constructed prompt to the LLM via UnifiedLLMOrchestrator.
- Component:
UnifiedLLMOrchestrator - What happens: The orchestrator selects the model based on the request's tier or analysis level, applies registered Advisors, and sends the prompt to the LLM. In standard mode, it returns the full text response. In streaming mode, it returns a
Flux<String>of chunks. - Configuration: Model selection and tier timeouts are configured through
spring.ai.securityandspring.ai.security.tiered. Streaming and retrieval behavior are configured separately throughcontexa.streamingandcontexa.rag. See LLM & Models.
5. SOAR_TOOL_EXECUTION (Optional)
Executes tool-driven post-LLM actions before response parsing when the pipeline configuration includes the SOAR phase.
- Config step:
PipelineConfiguration.PipelineStep.SOAR_TOOL_EXECUTION - What happens: The standard executor can forward the LLM result into a tool-execution phase before parsing.
ResponseParsingStepchecks this phase first and falls back to the rawLLM_EXECUTIONresult when no SOAR phase is present. - Default behavior: The built-in default pipeline configuration does not add this step automatically.
6. RESPONSE_PARSING
Parses the raw LLM text output into your typed response object.
- What happens: The framework extracts JSON from the LLM response (handling markdown code blocks, extra text, etc.), then deserializes it into your
AIResponsesubclass usingBeanOutputConverter. If JSON extraction fails, a fallback parser attempts bracket balancing and repair. - Streaming note: This step is not used in streaming mode. Instead, the
JsonStreamingProcessorhandles JSON extraction from the stream using protocol markers.
7. POSTPROCESSING
Final enrichment, validation, and domain-specific processing of the parsed response.
- Component:
DomainResponseProcessor(optional, custom) - What happens: If a
DomainResponseProcessoris registered that supports the request's template key, itswrapResponse()method is called to transform or enrich the parsed response. Execution metadata (timing, request ID) is also attached. - Streaming note: This step is not used in streaming mode.
Standard vs Streaming Pipeline
The pipeline operates in two modes depending on the PipelineConfiguration:
| Aspect | Standard Pipeline | Streaming Pipeline |
|---|---|---|
| Steps | Default 6 steps, with optional SOAR_TOOL_EXECUTION if explicitly configured | First 4 steps (through LLM_EXECUTION) |
| Executor | UniversalPipelineExecutor | StreamingUniversalPipelineExecutor |
| Return type | Mono<R> (typed response) | Flux<String> (chunk stream) |
| JSON handling | BeanOutputConverter in RESPONSE_PARSING | JsonStreamingProcessor with protocol markers |
| Config flag | enableStreaming(false) (default) | enableStreaming(true) |
| Factory method | PipelineConfiguration.createPipelineConfig() | PipelineConfiguration.createStreamPipelineConfig() |
Standard Pipeline Configuration
// Default 6-step pipeline, no streaming
PipelineConfiguration config = PipelineConfiguration.builder()
.addStep(PipelineConfiguration.PipelineStep.CONTEXT_RETRIEVAL)
.addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
.addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
.addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
.addStep(PipelineConfiguration.PipelineStep.RESPONSE_PARSING)
.addStep(PipelineConfiguration.PipelineStep.POSTPROCESSING)
.timeoutSeconds(300)
.build();
Mono<MyResponse> result = orchestrator.execute(
aiRequest, config, MyResponse.class);
Streaming Pipeline Configuration
// First 4 steps, streaming enabled
PipelineConfiguration config = PipelineConfiguration.builder()
.addStep(PipelineConfiguration.PipelineStep.CONTEXT_RETRIEVAL)
.addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
.addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
.addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
.enableStreaming(true)
.timeoutSeconds(300)
.build();
Flux<String> chunks = orchestrator.executeStream(
aiRequest, config);
You can also use the built-in factory methods:
// Equivalent to the manual configurations above
PipelineConfiguration.createPipelineConfig(); // standard
PipelineConfiguration.createStreamPipelineConfig(); // streaming
If you need the optional SOAR phase, build the configuration manually and add PipelineConfiguration.PipelineStep.SOAR_TOOL_EXECUTION between LLM_EXECUTION and RESPONSE_PARSING.
RAG Integration Guide
RAG (Retrieval-Augmented Generation) enhances LLM responses by providing relevant context from your data. The CONTEXT_RETRIEVAL step handles this automatically when a ContextRetriever is available.
How RAG Works in the Pipeline
- The CONTEXT_RETRIEVAL step calls your
ContextRetriever.retrieveContext() - The retriever searches the vector store for documents similar to the request query
- Matching documents are formatted into a context string
- The context string is passed to
generateUserPrompt(request, contextInfo)in the PROMPT_GENERATION step - Your prompt template includes this context in the user prompt sent to the LLM
Implementing a ContextRetriever
@Component
public class MyContextRetriever extends ContextRetriever {
public MyContextRetriever(
VectorStore vectorStore,
ContexaRagProperties ragProperties) {
super(vectorStore, ragProperties);
}
@Override
public ContextRetrievalResult retrieveContext(
AIRequest<? extends DomainContext> request) {
// Extract search query from your domain context
String query = extractQueryFromRequest(request);
// Search the vector store
List<Document> docs = vectorStore
.similaritySearch(query);
// Format results as context string
String contextInfo = docs.stream()
.map(doc -> doc.getText())
.collect(Collectors.joining("\n\n"));
return new ContextRetrievalResult(
contextInfo, docs,
Map.of("retrieverType", "custom",
"docCount", docs.size()));
}
}
Storing Documents for RAG
Before retrieval works, you need documents in your vector store. Use Spring AI's VectorStore interface to store documents:
// Store documents with metadata
Document doc = new Document(
"Product A is a lightweight laptop ideal for travel",
Map.of("category", "electronics",
"productId", "PROD-001"));
vectorStore.add(List.of(doc));
The default retriever works with the configured Spring AI
VectorStore. When PgVector is wired, use the RAG defaults below and configure the vector store connection separately:
contexa:
rag:
defaults:
top-k: 10
similarity-threshold: 0.7
See AI Configuration for all RAG properties.
Custom Pipeline Steps
You can add custom processing at any stage by implementing the PipelineStep interface. Custom steps are auto-discovered as Spring beans and inserted into the pipeline based on their getConfigStep() and getOrder().
@Component
public class InputNormalizationStep implements PipelineStep {
@Override
public <T extends DomainContext> Mono<Object> execute(
AIRequest<T> request,
PipelineExecutionContext context) {
// Normalize whitespace, trim input, add metadata
String normalized = request.getContext()
.getDomainType().trim().toUpperCase();
context.addMetadata("normalizedType", normalized);
context.addMetadata("processedAt",
Instant.now().toString());
return Mono.just(request);
}
@Override
public PipelineConfiguration.PipelineStep getConfigStep() {
// Runs during the PREPROCESSING phase
return PipelineConfiguration.PipelineStep.PREPROCESSING;
}
@Override
public int getOrder() {
// Lower order = runs first within the phase
// Default steps run at order 100
return 50;
}
}
Multiple custom steps can target the same pipeline phase. They execute in getOrder() order, with lower values running first.
Custom Response Processor
Implement DomainResponseProcessor to transform or enrich the parsed response during the POSTPROCESSING step:
@Component
public class ProductResponseProcessor
implements DomainResponseProcessor {
@Override
public boolean supports(String templateKey) {
return "PRODUCT_RECOMMENDATION".equals(templateKey);
}
@Override
public boolean supportsType(Class<?> responseType) {
return ProductRecommendationResponse.class
.isAssignableFrom(responseType);
}
@Override
public Object wrapResponse(
Object parsedData,
PipelineExecutionContext context) {
// Enrich the parsed response with execution metadata
if (parsedData instanceof ProductRecommendationResponse resp) {
resp.withMetadata("pipelineExecutionTime",
context.getElapsedTimeMs());
resp.withMetadata("modelUsed",
context.getMetadata("modelName"));
}
return parsedData;
}
}
Configuration
Key pipeline and RAG knobs:
| Property | Description | Default |
|---|---|---|
PipelineConfiguration.timeoutSeconds(...) | Per-request pipeline timeout configured in code | 300 |
contexa.streaming.timeout | Maximum duration allowed for streaming output assembly | PT5M |
contexa.rag.defaults.top-k | Default number of similar documents to retrieve | 10 |
contexa.rag.defaults.similarity-threshold | Default similarity threshold applied to retrieval | 0.7 |
See AI Configuration for all pipeline, RAG, and streaming properties.
API Reference
PipelineOrchestrator
public class PipelineOrchestrator
Constructor: public PipelineOrchestrator(List<PipelineExecutor> executors)
PipelineConfiguration
public class PipelineConfiguration
Fields
| Property | Type | Default | Description |
|---|---|---|---|
steps | List<PipelineStep> | Empty | Ordered list of pipeline steps. |
parameters | Map<String, Object> | Empty | Key-value parameters for step configuration. |
timeoutSeconds | int | 300 | Maximum execution time. |
enableStreaming | boolean | false | Whether to use streaming executor. |
name | String | null | Optional pipeline name. |
description | String | null | Optional description. |
Builder Methods
PipelineStep Enum
| Value | Description |
|---|---|
CONTEXT_RETRIEVAL | RAG context retrieval from vector stores. |
PREPROCESSING | Request validation, normalization, and enrichment. |
PROMPT_GENERATION | Constructs the final prompt from templates and context. |
LLM_EXECUTION | Sends the prompt to the LLM via UnifiedLLMOrchestrator. |
SOAR_TOOL_EXECUTION | Optional tool-execution phase between the raw LLM result and response parsing. |
RESPONSE_PARSING | Parses the LLM output into the typed response. |
POSTPROCESSING | Final enrichment, validation, and domain processing. |
Factory Methods
PipelineStep Interface (custom steps)
public interface PipelineStep
true.PipelineExecutor Interface
public interface PipelineExecutor
Built-in: UniversalPipelineExecutor (standard) and StreamingUniversalPipelineExecutor (streaming).
ContextRetriever
public class ContextRetriever
Constructor: public ContextRetriever(VectorStore vectorStore, ContexaRagProperties ragProperties)
ContextRetrievalResult
Constructor: ContextRetrievalResult(String contextInfo, List<Document> documents, Map<String, Object> metadata)
getContextInfo()— The formatted context string passed to the prompt templategetDocuments()— The raw matching documentsgetMetadata()— Retrieval metadata (retriever type, timestamps, etc.)
DomainResponseProcessor Interface
public interface DomainResponseProcessor