contexa-core

Building Custom AI

Build your own AI-powered features using the Contexa AI Diagnosis Process. This guide walks you through creating a complete custom AI feature — from defining domain types to exposing REST endpoints — using the Strategy, Lab, and Pipeline architecture.

What You'll Build

By following this guide, you will create a fully working AI feature that:

  • Accepts domain-specific input via REST API
  • Routes the request through the AI Diagnosis Process
  • Generates structured LLM responses with JSON parsing
  • Supports both standard (synchronous) and streaming (SSE) modes
  • Optionally retrieves RAG context from a vector store

End-to-End Flow

Every AI request in Contexa follows this execution path:

Controller
AINativeProcessor
AIStrategyRegistry
Strategy
Lab
Pipeline (6 Steps)

The highlighted components are what you implement. The framework handles everything else — request routing, distributed locking, audit logging, LLM execution, and response parsing.

Example Domain
This guide uses a Product Recommendation feature as the running example. The same pattern applies to any domain: sentiment analysis, code review, document summarization, or any custom AI task.

Architecture

The AI Diagnosis Process uses a three-layer architecture. Each layer has a clear responsibility:

LayerClassResponsibility
Strategy AbstractAIStrategy Routes requests by DiagnosisType, validates input, delegates to the correct Lab.
Lab AbstractAILab Collects and enriches domain data, then delegates to the Pipeline for LLM processing.
Pipeline PipelineOrchestrator Executes 6 processing steps: context retrieval, preprocessing, prompt generation, LLM execution, response parsing, postprocessing.

What You Implement vs. What the Framework Provides

ComponentYou ImplementPurpose
DomainContextRequiredYour domain input data model
AIResponseRequiredYour domain output data model
PromptTemplateRequiredSystem/user prompts and JSON schema for LLM
LabRequiredData enrichment and pipeline orchestration
StrategyRequiredRequest validation and lab delegation
ContextRetrieverOptionalRAG context from vector store
DomainResponseProcessorOptionalCustom response post-processing
ControllerRequiredREST endpoints for standard and streaming

Step 1: Define Domain Types

Start by defining your input context and output response. These are the data models your AI feature works with.

DomainContext — Your Input

Extend DomainContext to carry your domain-specific input data. The only required method is getDomainType(), which returns a string identifier for your domain.

Java
@Getter
public class ProductContext extends DomainContext {

    private final String productId;
    private final String category;
    private final List<String> userPreferences;

    public ProductContext(String productId, String category,
                          List<String> userPreferences) {
        this.productId = productId;
        this.category = category;
        this.userPreferences = userPreferences;
    }

    @Override
    public String getDomainType() {
        return "PRODUCT_RECOMMENDATION";
    }
}

DomainContext provides built-in fields: contextId (auto-generated UUID), userId, sessionId, organizationId, and a metadata map. Use addMetadata(key, value) to attach additional key-value pairs.

AIResponse — Your Output

Extend AIResponse to define the structured output the LLM will produce. This class is used by the JSON parser to deserialize the LLM response.

Java
@Getter @Setter
@NoArgsConstructor
public class ProductRecommendationResponse extends AIResponse {

    private List<Recommendation> recommendations;
    private double confidence;
    private String reasoning;

    @Getter @Setter
    @NoArgsConstructor
    public static class Recommendation {
        private String productName;
        private String reason;
        private double score;
    }
}
Important
Your AIResponse subclass must have a no-argument constructor and setter methods (or use @NoArgsConstructor and @Setter). The JSON parser uses these to deserialize the LLM output.

Lab Request DTO

Define a simple DTO for the Lab's input. This is the object your Strategy converts the AIRequest into.

Java
@Getter @AllArgsConstructor
public class ProductRecommendationRequest {
    private final String productId;
    private final String category;
    private final List<String> userPreferences;
}

Type Identifiers

TemplateType identifies which prompt template to use. DiagnosisType identifies which strategy handles the request. Both are simple wrapper classes.

Java
// Used in AIRequest to select the prompt template
new TemplateType("PRODUCT_RECOMMENDATION")

// Used in AIRequest to route to the correct strategy
new DiagnosisType("PRODUCT_RECOMMENDATION")

Step 2: Create the Prompt Template

The prompt template defines what the LLM sees: the system prompt (role, rules, output format) and the user prompt (the actual query with data). You need two templates — one for standard mode and one for streaming.

Standard Prompt Template

Extend AbstractStandardPromptTemplate<T> where T is your response type. The framework uses BeanOutputConverter to automatically generate JSON schema instructions from your response class.

Java
@Component
public class ProductPromptTemplate
        extends AbstractStandardPromptTemplate<ProductRecommendationResponse> {

    public ProductPromptTemplate() {
        super(ProductRecommendationResponse.class);
    }

    @Override
    public TemplateType getSupportedType() {
        return new TemplateType("PRODUCT_RECOMMENDATION");
    }

    @Override
    protected String generateDomainSystemPrompt(
            AIRequest<? extends DomainContext> request,
            String systemMetadata) {
        return """
            You are a product recommendation AI assistant.
            Analyze the user's preferences and product category
            to generate personalized recommendations.

            Rules:
            - Provide 3-5 recommendations ranked by relevance
            - Each recommendation must include a reason and score (0.0-1.0)
            - Set overall confidence based on preference match quality
            - Be specific about why each product fits the user's needs
            """;
    }

    @Override
    public String generateUserPrompt(
            AIRequest<? extends DomainContext> request,
            String contextInfo) {
        ProductContext ctx = (ProductContext) request.getContext();
        StringBuilder prompt = new StringBuilder();
        prompt.append("[Query]\n");
        prompt.append("Product ID: ").append(ctx.getProductId()).append("\n");
        prompt.append("Category: ").append(ctx.getCategory()).append("\n");
        prompt.append("User Preferences: ")
              .append(String.join(", ", ctx.getUserPreferences()))
              .append("\n");

        if (contextInfo != null && !contextInfo.isEmpty()) {
            prompt.append("\n[Context]\n").append(contextInfo);
        }
        return prompt.toString();
    }
}

Streaming Prompt Template

For streaming mode, extend AbstractStreamingPromptTemplate. The key difference is getJsonSchemaExample() — you provide a concrete JSON example that the LLM uses as a template for its structured output, wrapped in streaming protocol markers.

Java
@Component
public class ProductStreamingPromptTemplate
        extends AbstractStreamingPromptTemplate {

    @Override
    public TemplateType getSupportedType() {
        return new TemplateType("PRODUCT_RECOMMENDATION_STREAMING");
    }

    @Override
    protected String generateDomainSystemPrompt(
            AIRequest<? extends DomainContext> request,
            String systemMetadata) {
        return """
            You are a product recommendation AI assistant.
            First, provide a natural language analysis of the user's
            preferences, then generate structured recommendations.
            """;
    }

    @Override
    protected String getJsonSchemaExample() {
        return """
            {
              "recommendations": [
                {
                  "productName": "Example Product",
                  "reason": "Matches preference for...",
                  "score": 0.95
                }
              ],
              "confidence": 0.88,
              "reasoning": "Based on the user's preferences..."
            }
            """;
    }

    @Override
    public String generateUserPrompt(
            AIRequest<? extends DomainContext> request,
            String contextInfo) {
        ProductContext ctx = (ProductContext) request.getContext();
        return String.format(
            "Recommend products for category '%s' based on " +
            "preferences: %s",
            ctx.getCategory(),
            String.join(", ", ctx.getUserPreferences()));
    }
}
How Streaming Protocol Works
In streaming mode, the LLM first outputs natural language text (prefixed with ###STREAMING###), then outputs structured JSON between ===JSON_START=== and ===JSON_END=== markers. The framework handles all marker detection and JSON extraction automatically. See Streaming for details.

Step 3: Implement the Lab

The Lab is where domain-specific data collection and enrichment happens before the request reaches the Pipeline. Extend AbstractAILab<Req, Res> and override the processing methods.

Lab Execution Pattern

The Lab follows a template method pattern with three execution modes:

  • doProcess(request) — Synchronous execution, returns the response directly
  • doProcessAsync(request) — Asynchronous execution, returns Mono<Res>
  • doProcessStream(request) — Streaming execution, returns Flux<String>

The typical pattern: build an AIRequest, configure the Pipeline, and delegate.

Java
@Component
public class ProductRecommendationLab
        extends AbstractAILab<ProductRecommendationRequest,
                               ProductRecommendationResponse> {

    private final PipelineOrchestrator orchestrator;

    public ProductRecommendationLab(PipelineOrchestrator orchestrator) {
        super("ProductRecommendation");
        this.orchestrator = orchestrator;
    }

    @Override
    protected ProductRecommendationResponse doProcess(
            ProductRecommendationRequest request) throws Exception {
        return doProcessAsync(request).block();
    }

    @Override
    protected Mono<ProductRecommendationResponse> doProcessAsync(
            ProductRecommendationRequest request) {
        AIRequest<ProductContext> aiRequest =
            buildAIRequest(request, "PRODUCT_RECOMMENDATION");

        PipelineConfiguration config = PipelineConfiguration.builder()
            .addStep(PipelineConfiguration.PipelineStep.CONTEXT_RETRIEVAL)
            .addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
            .addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
            .addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
            .addStep(PipelineConfiguration.PipelineStep.RESPONSE_PARSING)
            .addStep(PipelineConfiguration.PipelineStep.POSTPROCESSING)
            .timeoutSeconds(120)
            .build();

        return orchestrator.execute(
            aiRequest, config, ProductRecommendationResponse.class);
    }

    @Override
    protected Flux<String> doProcessStream(
            ProductRecommendationRequest request) {
        AIRequest<ProductContext> aiRequest =
            buildAIRequest(request,
                           "PRODUCT_RECOMMENDATION_STREAMING");

        PipelineConfiguration config = PipelineConfiguration.builder()
            .addStep(PipelineConfiguration.PipelineStep.CONTEXT_RETRIEVAL)
            .addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
            .addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
            .addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
            .enableStreaming(true)
            .timeoutSeconds(120)
            .build();

        return orchestrator.executeStream(aiRequest, config);
    }

    private AIRequest<ProductContext> buildAIRequest(
            ProductRecommendationRequest request,
            String templateName) {
        ProductContext context = new ProductContext(
            request.getProductId(),
            request.getCategory(),
            request.getUserPreferences());

        return new AIRequest<>(
            context,
            new TemplateType(templateName),
            new DiagnosisType("PRODUCT_RECOMMENDATION"));
    }
}

Data Enrichment Pattern

For more complex features, you can enrich the request with additional data before sending it to the Pipeline. This pattern collects domain data (from databases, external APIs, etc.) and adds it as parameters to the AIRequest.

Java
// Inside your Lab, before calling orchestrator:
private AIRequest<ProductContext> enrichRequest(
        AIRequest<ProductContext> request) {
    // Collect domain data from your services
    String productData = productService
        .getProductDetails(request.getContext().getProductId());
    String userHistory = userService
        .getPurchaseHistory(request.getContext().getUserId());

    // Add as parameters - these become available in the prompt
    request.withParameter("productData", productData);
    request.withParameter("userHistory", userHistory);
    request.withParameter("systemMetadata",
        "Product catalog size: " + productService.getCatalogSize());

    return request;
}
Standard vs Streaming Pipeline Config
For standard mode, include all 6 steps (CONTEXT_RETRIEVAL through POSTPROCESSING). For streaming mode, use only the first 4 steps (through LLM_EXECUTION) and set enableStreaming(true). The streaming executor handles response delivery directly as a Flux<String>.

Step 4: Implement the Strategy

The Strategy routes AI requests to the correct Lab based on DiagnosisType. It validates the input, resolves the Lab, converts the request format, and delegates execution.

Extend AbstractAIStrategy<T, R> and implement the four abstract methods:

Java
@Component
public class ProductRecommendationStrategy
        extends AbstractAIStrategy<ProductContext,
                                    ProductRecommendationResponse> {

    public ProductRecommendationStrategy(AILabFactory labFactory) {
        super(labFactory);
    }

    @Override
    public DiagnosisType getSupportedType() {
        return new DiagnosisType("PRODUCT_RECOMMENDATION");
    }

    @Override
    public int getPriority() {
        return 100;
    }

    @Override
    protected Class<?> getLabType() {
        return ProductRecommendationLab.class;
    }

    @Override
    protected void validateRequest(
            AIRequest<ProductContext> request)
            throws DiagnosisException {
        if (request.getContext() == null) {
            throw new DiagnosisException(
                getSupportedType().name(),
                "INVALID_REQUEST",
                "ProductContext is required");
        }
        if (request.getContext().getCategory() == null) {
            throw new DiagnosisException(
                getSupportedType().name(),
                "INVALID_REQUEST",
                "Product category is required");
        }
    }

    @Override
    protected Object convertLabRequest(
            AIRequest<ProductContext> request)
            throws DiagnosisException {
        ProductContext ctx = request.getContext();
        return new ProductRecommendationRequest(
            ctx.getProductId(),
            ctx.getCategory(),
            ctx.getUserPreferences());
    }

    @Override
    protected ProductRecommendationResponse processLabExecution(
            Object lab, Object labRequest,
            AIRequest<ProductContext> request) throws Exception {
        ProductRecommendationLab productLab =
            (ProductRecommendationLab) lab;
        return productLab.process(
            (ProductRecommendationRequest) labRequest);
    }

    @Override
    protected Mono<ProductRecommendationResponse>
            processLabExecutionAsync(
                Object lab, Object labRequest,
                AIRequest<ProductContext> request) {
        ProductRecommendationLab productLab =
            (ProductRecommendationLab) lab;
        return productLab.processAsync(
            (ProductRecommendationRequest) labRequest);
    }

    @Override
    protected Flux<String> processLabExecutionStream(
            Object lab, Object labRequest,
            AIRequest<ProductContext> request) {
        ProductRecommendationLab productLab =
            (ProductRecommendationLab) lab;
        return productLab.processStream(
            (ProductRecommendationRequest) labRequest);
    }

    @Override
    public boolean supportsStreaming() {
        return true;
    }
}

Template Method Execution Order

AbstractAIStrategy uses a template method pattern. When execute(), executeAsync(), or executeStream() is called, the framework runs these steps in order:

  1. validateRequest(request) — Validate the incoming AIRequest
  2. getRequiredLab() — Resolve the Lab via AILabFactory using getLabType()
  3. convertLabRequest(request) — Convert the AIRequest to the Lab's request format
  4. processLabExecution(lab, labRequest, request) — Execute the Lab and return the response

Step 5: Add RAG Context (Optional)

If your feature needs to augment LLM prompts with relevant context from a vector store, extend the ContextRetriever class. The Pipeline's CONTEXT_RETRIEVAL step calls registered retrievers automatically.

Java
@Component
public class ProductContextRetriever extends ContextRetriever {

    public ProductContextRetriever(
            VectorStore vectorStore,
            ContexaRagProperties ragProperties) {
        super(vectorStore, ragProperties);
    }

    @Override
    public ContextRetrievalResult retrieveContext(
            AIRequest<? extends DomainContext> request) {
        if (!(request.getContext() instanceof ProductContext ctx)) {
            return new ContextRetrievalResult("", List.of(), Map.of());
        }

        String query = ctx.getCategory() + " " +
            String.join(" ", ctx.getUserPreferences());

        List<Document> docs = vectorStore
            .similaritySearch(query);

        String contextInfo = docs.stream()
            .map(Document::getText)
            .collect(Collectors.joining("\n\n"));

        return new ContextRetrievalResult(
            contextInfo,
            docs,
            Map.of("retrieverType", "product",
                    "timestamp", Instant.now().toString()));
    }
}

The retrieved contextInfo is automatically passed as the contextInfo parameter to your prompt template's generateUserPrompt() method during the PROMPT_GENERATION step.

Query Source
The default ContextRetriever reads request.getParameter("naturalLanguageQuery", String.class). If you only call setNaturalLanguageQuery(...), override extractQueryFromRequest() or also add the same value as a request parameter.
Vector Store Setup
To use RAG, you need a vector store configured. Contexa supports pgvector out of the box. See AI Configuration for RAG properties including contexa.rag.defaults.top-k and contexa.rag.defaults.similarity-threshold.

Step 6: Wire It Together

Create a REST controller that builds AIRequest objects and delegates to the streaming service. This is the entry point for your AI feature.

Two Execution Paths

You can invoke your AI feature through two paths:

  • Strategy/Lab Path — Uses AINativeProcessor (or StandardStreamingService) which routes through AIStrategyRegistry to your Strategy, then Lab, then Pipeline. Provides distributed locking and audit logging.
  • Direct Lab Path — Injects your Lab directly and calls processAsync() or processStream(). Simpler but bypasses strategy routing and audit.

Controller with Strategy/Lab Path (Recommended)

Java
@RestController
@RequestMapping("/api/recommend")
public class ProductRecommendationController {

    private final StandardStreamingService streamingService;
    private final AINativeProcessor<ProductContext> aiProcessor;

    public ProductRecommendationController(
            StandardStreamingService streamingService,
            AINativeProcessor<ProductContext> aiProcessor) {
        this.streamingService = streamingService;
        this.aiProcessor = aiProcessor;
    }

    @PostMapping
    public Mono<ResponseEntity<ProductRecommendationResponse>>
            recommend(@RequestBody RecommendRequest body) {
        AIRequest<ProductContext> request = buildRequest(body,
            "PRODUCT_RECOMMENDATION");
        return streamingService.process(
                request, aiProcessor,
                ProductRecommendationResponse.class)
            .map(ResponseEntity::ok);
    }

    @PostMapping(value = "/stream",
        produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> recommendStream(
            @RequestBody RecommendRequest body) {
        AIRequest<ProductContext> request = buildRequest(body,
            "PRODUCT_RECOMMENDATION_STREAMING");
        return streamingService.stream(request, aiProcessor);
    }

    private AIRequest<ProductContext> buildRequest(
            RecommendRequest body, String templateName) {
        ProductContext context = new ProductContext(
            body.getProductId(),
            body.getCategory(),
            body.getPreferences());
        return new AIRequest<>(
            context,
            new TemplateType(templateName),
            new DiagnosisType("PRODUCT_RECOMMENDATION"));
    }
}

Application Setup

Java
@SpringBootApplication
@EnableAISecurity  // from io.contexa.contexacommon.annotation
public class MyAiApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyAiApplication.class, args);
    }
}

Minimal Configuration

YAML
spring:
  ai:
    ollama:
      base-url: http://localhost:11434
      chat:
        model: qwen2.5:7b

spring:
  ai:
    security:
      layer1:
        model: qwen2.5:14b
      layer2:
        model: exaone3.5:latest
      tiered:
        layer1:
          timeout-ms: 30000
        layer2:
          timeout-ms: 60000
contexa:
  streaming:
    timeout: PT5M

See AI Configuration for the complete property reference including RAG, advisor, and model tier settings.

Configuration

Key properties that affect your custom AI feature:

PropertyDescriptionDefault
spring.ai.security.layer1.modelPrimary model used for layer 1 analysisqwen2.5:14b
spring.ai.security.layer2.modelPrimary model used for layer 2 analysisexaone3.5:latest
spring.ai.security.tiered.layer1.timeout-msTimeout applied to layer 1 model execution30000
spring.ai.security.tiered.layer2.timeout-msTimeout applied to layer 2 model execution60000
contexa.streaming.timeoutStreaming timeout used when the selected strategy emits chunked outputPT5M
Full Configuration Reference
For the complete list of AI properties, see AI Configuration.

API Reference

AIStrategy<T, R> Interface
public interface AIStrategy<T extends DomainContext, R extends AIResponse>
getSupportedType() DiagnosisType
Returns the DiagnosisType this strategy handles.
getPriority() int
Priority for same-type strategies. Lowest wins.
execute(AIRequest<T> request, Class<R> responseType) R
Synchronous execution.
executeAsync(AIRequest<T> request, Class<R> responseType) Mono<R>
Asynchronous execution.
executeStream(AIRequest<T> request, Class<R> responseType) Flux<String>
Streaming execution.
supportsStreaming() boolean
Whether this strategy supports streaming. Default: false.
AbstractAIStrategy<T, R> Abstract Methods
public abstract class AbstractAIStrategy<T extends DomainContext, R extends AIResponse> implements AIStrategy<T, R>

Constructor: protected AbstractAIStrategy(AILabFactory labFactory)

getLabType() Class<?>
Returns the Lab class this strategy delegates to.
validateRequest(AIRequest<T> request) void
Validates the request. Throw DiagnosisException for invalid input.
convertLabRequest(AIRequest<T> request) Object
Converts AIRequest to the Lab's request type.
processLabExecution(Object lab, Object labRequest, AIRequest<T> request) R
Synchronous lab execution.
processLabExecutionAsync(Object lab, Object labRequest, AIRequest<T> request) Mono<R>
Asynchronous lab execution.
processLabExecutionStream(Object lab, Object labRequest, AIRequest<T> request) Flux<String>
Streaming lab execution. Override to enable streaming.
AbstractAILab<Req, Res>
public abstract class AbstractAILab<Req, Res> implements AILab<Req, Res>

Constructor: protected AbstractAILab(String labName)

doProcess(Req request) Res
Override for synchronous processing logic.
doProcessAsync(Req request) Mono<Res>
Override for asynchronous processing. Default wraps doProcess() in Mono.
doProcessStream(Req request) Flux<String>
Override for streaming processing.
validateRequest(Req request) void
Called before processing. Override to add lab-level validation.
preProcess(Req request) void
Called before doProcess(). Override for pre-processing hooks.
postProcess(Req request, Res result) void
Called after doProcess(). Override for post-processing hooks.
AIRequest<T>
public class AIRequest<T extends DomainContext>

Constructor: public AIRequest(T context, TemplateType templateType, DiagnosisType diagnosisType)

withParameter(String key, Object value) AIRequest<T>
Adds a parameter to the request. Returns this for chaining.
getParameter(String key, Class<P> type) P
Retrieves a typed parameter by key.
getContext() T
Returns the domain context.
getPromptTemplate() TemplateType
Returns the template type for prompt selection.
setNaturalLanguageQuery(String query) void
Sets the request field used by your own templates or strategies. The default ContextRetriever looks for the "naturalLanguageQuery" request parameter via getParameter(...), or falls back to context.toString() if no parameter is present.
AIStrategyRegistry
public class AIStrategyRegistry
getStrategy(DiagnosisType diagnosisType) AIStrategy<T, R>
Returns the strategy for the given type. Throws DiagnosisException if not found.
executeStrategyAsync(AIRequest<T> request, Class<R> responseType) Mono<R>
Looks up and executes the strategy asynchronously.
executeStrategyStream(AIRequest<T> request, Class<R> responseType) Flux<String>
Looks up and executes the strategy in streaming mode.
AINativeProcessor<T>
public final class AINativeProcessor<T extends DomainContext> implements AICoreOperations<T>

The primary entry point for AI processing. Wraps strategy execution with distributed locking and audit logging.

process(AIRequest<T> request, Class<R> responseType) Mono<R>
Processes the request through the strategy layer with distributed lock and audit trail.
processStream(AIRequest<T> request) Flux<String>
Streaming processing through the strategy layer.
PromptTemplate Interfaces

AbstractStandardPromptTemplate<T>

Constructor: protected AbstractStandardPromptTemplate(Class<T> responseType)

getSupportedType() TemplateType
Returns the template type this template handles.
generateDomainSystemPrompt(AIRequest<?> request, String systemMetadata) String
Returns the domain-specific system prompt. The framework appends JSON format instructions automatically.
generateUserPrompt(AIRequest<?> request, String contextInfo) String
Returns the user prompt. contextInfo contains RAG-retrieved context if available.

AbstractStreamingPromptTemplate

Same methods as above, plus:

getJsonSchemaExample() String
Returns a JSON example used as a template for the LLM's structured output in streaming mode.