contexa-core

Streaming

Contexa provides real-time Server-Sent Events (SSE) streaming for AI responses. The streaming subsystem processes raw LLM chunks, detects protocol markers, extracts structured JSON payloads, and delivers them to clients through two execution paths: the Strategy/Lab path and the direct Pipeline path.

Overview: Two Streaming Paths

Streaming supports two execution paths, both producing Flux<ServerSentEvent<String>>:

PathEntry PointUse When
Strategy/Lab streamingService.stream(request, aiProcessor) Full AI Diagnosis Process with routing, audit, and distributed locking
Direct Pipeline streamingService.stream(request, pipelineOrchestrator) Simple LLM calls or custom prompts without strategy routing

Both paths share the same streaming protocol, marker detection, and client-side handling.

Streaming Architecture

The data flow from LLM to client browser:

LLM / Pipeline Chunks
StreamingContext
SentenceBuffer + Marker State
SSE Events
Client Browser

The DefaultStandardStreamingService orchestrates the SSE path:

  1. Calls AICoreOperations.processStream() or PipelineOrchestrator.executeStream() to get a Flux<String> of raw chunks and protocol markers
  2. Appends chunks into StreamingContext, which keeps track of final-response markers and sentence buffering state
  3. Narrative text is emitted progressively through SentenceBuffer
  4. If upstream emits ###GENERATING_RESULT###, the service forwards it immediately to the SSE client
  5. When accumulated data contains ###FINAL_RESPONSE###, the service extracts that structured tail and emits it as the final SSE payload
  6. Each outbound item is wrapped in a ServerSentEvent for delivery
Processor split
JsonStreamingProcessor still exists in the OSS core and is used by DefaultStreamingHandler when UnifiedLLMOrchestrator.stream() is called directly. The SSE service path documented on this page uses StreamingContext and SentenceBuffer to package the already-streaming chunk flow into browser-friendly events.

Streaming Protocol & Markers

The streaming pipeline uses a marker-based protocol within the LLM output to separate narrative text from structured JSON:

ConstantValueDescription
STREAMING_MARKER###STREAMING###Prefixes narrative text chunks for progressive client display.
GENERATING_RESULT_MARKER###GENERATING_RESULT###Sent when JSON_START is detected. Signals that analysis text is complete and result generation has begun.
JSON_START_MARKER===JSON_START===Signals the beginning of a JSON payload in the stream.
JSON_END_MARKER===JSON_END===Signals the end of a JSON payload in the stream.
FINAL_RESPONSE_MARKER###FINAL_RESPONSE###Prefixes the final complete JSON response after extraction.

Streaming Output Sequence

A typical streaming session produces events in this order:

SSE Events
###STREAMING###Based on the user's preferences for...
###STREAMING###lightweight and portable devices, I recommend...
###STREAMING###the following products:
###GENERATING_RESULT###
###FINAL_RESPONSE###{"recommendations":[...],"confidence":0.88,"reasoning":"..."}

Server-Side Implementation

Strategy/Lab Path (Full AI Diagnosis Process)

Uses AINativeProcessor for routing, distributed locking, and audit logging:

Java
@RestController
public class MyAiController {

    private final StandardStreamingService streamingService;
    private final AINativeProcessor<MyContext> aiProcessor;

    // Standard (non-streaming) response
    @PostMapping("/api/analyze")
    public Mono<MyResponse> analyze(@RequestBody AnalysisInput body) {
        AIRequest<MyContext> request = buildRequest(body,
            "MY_TEMPLATE");
        return streamingService.process(
            request, aiProcessor, MyResponse.class);
    }

    // SSE streaming response
    @PostMapping(value = "/api/analyze/stream",
        produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> analyzeStream(
            @RequestBody AnalysisInput body) {
        AIRequest<MyContext> request = buildRequest(body,
            "MY_TEMPLATE_STREAMING");
        return streamingService.stream(request, aiProcessor);
    }
}

Direct Pipeline Path (Simple LLM Calls)

Bypasses the Strategy/Lab layer for simpler use cases:

Java
@PostMapping(value = "/api/chat/stream",
    produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatStream(
        @RequestBody ChatInput body) {
    AIRequest<ChatContext> request = buildChatRequest(body);
    return streamingService.stream(
        request, pipelineOrchestrator);
}

Lab Streaming Implementation

In your Lab's doProcessStream(), configure the pipeline for streaming and call orchestrator.executeStream():

Java
@Override
protected Flux<String> doProcessStream(MyRequest request) {
    AIRequest<MyContext> aiRequest =
        buildAIRequest(request, "MY_TEMPLATE_STREAMING");

    PipelineConfiguration config = PipelineConfiguration.builder()
        .addStep(PipelineConfiguration.PipelineStep.CONTEXT_RETRIEVAL)
        .addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
        .addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
        .addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
        .enableStreaming(true)
        .timeoutSeconds(120)
        .build();

    return orchestrator.executeStream(aiRequest, config);
}

Error Streaming

StandardStreamingService provides an error stream for validation failures:

Java
if (body.getQuery() == null || body.getQuery().isBlank()) {
    return streamingService.errorStream(
        "VALIDATION_ERROR", "Query is required");
}

Client-Side Integration

The client receives SSE events and needs to handle the streaming protocol markers. Here is a complete JavaScript implementation using fetch + ReadableStream:

Streaming Client

JavaScript
function streamRequest(url, requestData, callbacks) {
    fetch(url, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(requestData)
    }).then(function(response) {
        if (!response.ok) throw new Error('HTTP ' + response.status);

        var reader = response.body.getReader();
        var decoder = new TextDecoder();
        var buffer = '';

        function processStream() {
            reader.read().then(function(result) {
                if (result.done) {
                    if (callbacks.onComplete) callbacks.onComplete();
                    return;
                }

                buffer += decoder.decode(result.value, { stream: true });
                var lines = buffer.split('\n');
                buffer = lines.pop(); // Keep incomplete line

                lines.forEach(function(line) {
                    if (!line.startsWith('data:')) return;
                    var data = line.substring(5).trim();
                    if (!data || data === '') return;

                    handleStreamEvent(data, callbacks);
                });

                processStream(); // Continue reading
            });
        }

        processStream();
    }).catch(function(err) {
        if (callbacks.onError) callbacks.onError(err.message);
    });
}

function handleStreamEvent(data, callbacks) {
    if (data.startsWith('###STREAMING###')) {
        // Progressive text chunk - display immediately
        var text = data.substring('###STREAMING###'.length);
        if (callbacks.onChunk) callbacks.onChunk(text);

    } else if (data.startsWith('###GENERATING_RESULT###')) {
        // Analysis complete, generating structured result
        if (callbacks.onGenerating) callbacks.onGenerating();

    } else if (data.startsWith('###FINAL_RESPONSE###')) {
        // Final structured JSON response
        var json = data.substring('###FINAL_RESPONSE###'.length);
        if (callbacks.onFinalResponse) callbacks.onFinalResponse(json);

    } else if (data === '[DONE]') {
        // Stream complete
        if (callbacks.onComplete) callbacks.onComplete();

    } else if (data.startsWith('ERROR:')) {
        // Error event
        if (callbacks.onError) callbacks.onError(data);
    }
}

Usage Example

JavaScript
var outputEl = document.getElementById('output');
var fullText = '';

streamRequest('/api/analyze/stream', { query: 'Analyze this...' }, {
    onChunk: function(text) {
        // Append text progressively
        fullText += text;
        outputEl.textContent = fullText;
    },
    onGenerating: function() {
        // Show "generating result..." indicator
        outputEl.textContent += '\n\nGenerating structured result...';
    },
    onFinalResponse: function(json) {
        // Parse and display structured result
        try {
            var result = JSON.parse(json);
            displayResult(result);
        } catch (e) {
            outputEl.textContent = fullText; // Keep streamed text
        }
    },
    onComplete: function() {
        // Re-enable UI controls
        setButtonsEnabled(true);
    },
    onError: function(err) {
        outputEl.textContent = 'Error: ' + err;
        setButtonsEnabled(true);
    }
});
OSS Client Scope
The OSS core documents the streaming protocol and server-side contracts, but it does not ship a bundled browser client such as contexa-streaming.js. Implement the SSE client in your own frontend or reuse the protocol markers shown above.

Configuration

PropertyDescriptionDefault
contexa.streaming.final-response-markerMarker used to locate the final structured payload in the aggregated stream###FINAL_RESPONSE###
contexa.streaming.streaming-markerPrefix used for progressive narrative text chunks###STREAMING###
contexa.streaming.json-start-markerMarker used by JSON chunk processors when raw model output starts a structured payload===JSON_START===
contexa.streaming.json-end-markerMarker used by JSON chunk processors when raw model output ends a structured payload===JSON_END===
contexa.streaming.timeoutMaximum duration for a streaming session before timeout handling appliesPT5M
contexa.streaming.max-retriesMaximum retry count exposed on StreamingProperties3
contexa.streaming.retry-delayInitial retry delay exposed on StreamingPropertiesPT1S
contexa.streaming.retry-multiplierRetry backoff multiplier exposed on StreamingProperties1.5
contexa.streaming.marker-buffer-sizeAdditional tail buffer retained while scanning for final-response markers100
contexa.streaming.sentence-buffering-enabledControls sentence-level buffering before partial chunks are emittedtrue
Full Configuration Reference
See AI Configuration for all streaming properties.

API Reference

StandardStreamingService
public interface StandardStreamingService
stream(AIRequest<C> request, AICoreOperations<C> aiProcessor) Flux<ServerSentEvent<String>>
Strategy/Lab streaming path via AICoreOperations.processStream().
stream(AIRequest<C> request, PipelineOrchestrator pipelineOrchestrator) Flux<ServerSentEvent<String>>
Direct pipeline streaming path.
process(AIRequest<C> request, AICoreOperations<C> aiProcessor, Class<R> responseType) Mono<R>
Non-streaming single response via Strategy/Lab path.
process(AIRequest<C> request, PipelineOrchestrator orchestrator, Class<R> responseType) Mono<R>
Non-streaming single response via pipeline path.
errorStream(String errorCode, String message) Flux<ServerSentEvent<String>>
Creates an SSE error stream with JSON error payload.
StreamingProtocol Constants
public final class StreamingProtocol
ConstantValue
STREAMING_MARKER###STREAMING###
FINAL_RESPONSE_MARKER###FINAL_RESPONSE###
JSON_START_MARKER===JSON_START===
JSON_END_MARKER===JSON_END===
GENERATING_RESULT_MARKER###GENERATING_RESULT###
ChunkProcessor Interface
public interface ChunkProcessor
process(Flux<String> upstream) Flux<String>
Processes the upstream flux of raw chunks and returns transformed output.
getProcessorType() String
Returns a string identifier for this processor.

Built-in: JsonStreamingProcessor handles JSON extraction with protocol markers and includes JSON repair logic for incomplete LLM output.