Streaming
Contexa provides real-time Server-Sent Events (SSE) streaming for AI responses. The streaming subsystem processes raw LLM chunks, detects protocol markers, extracts structured JSON payloads, and delivers them to clients through two execution paths: the Strategy/Lab path and the direct Pipeline path.
Overview: Two Streaming Paths
Streaming supports two execution paths, both producing Flux<ServerSentEvent<String>>:
| Path | Entry Point | Use When |
|---|---|---|
| Strategy/Lab | streamingService.stream(request, aiProcessor) |
Full AI Diagnosis Process with routing, audit, and distributed locking |
| Direct Pipeline | streamingService.stream(request, pipelineOrchestrator) |
Simple LLM calls or custom prompts without strategy routing |
Both paths share the same streaming protocol, marker detection, and client-side handling.
Streaming Architecture
The data flow from LLM to client browser:
The DefaultStandardStreamingService orchestrates the SSE path:
- Calls
AICoreOperations.processStream()orPipelineOrchestrator.executeStream()to get aFlux<String>of raw chunks and protocol markers - Appends chunks into
StreamingContext, which keeps track of final-response markers and sentence buffering state - Narrative text is emitted progressively through
SentenceBuffer - If upstream emits
###GENERATING_RESULT###, the service forwards it immediately to the SSE client - When accumulated data contains
###FINAL_RESPONSE###, the service extracts that structured tail and emits it as the final SSE payload - Each outbound item is wrapped in a
ServerSentEventfor delivery
JsonStreamingProcessor still exists in the OSS core and is used by DefaultStreamingHandler when UnifiedLLMOrchestrator.stream() is called directly. The SSE service path documented on this page uses StreamingContext and SentenceBuffer to package the already-streaming chunk flow into browser-friendly events.
Streaming Protocol & Markers
The streaming pipeline uses a marker-based protocol within the LLM output to separate narrative text from structured JSON:
| Constant | Value | Description |
|---|---|---|
STREAMING_MARKER | ###STREAMING### | Prefixes narrative text chunks for progressive client display. |
GENERATING_RESULT_MARKER | ###GENERATING_RESULT### | Sent when JSON_START is detected. Signals that analysis text is complete and result generation has begun. |
JSON_START_MARKER | ===JSON_START=== | Signals the beginning of a JSON payload in the stream. |
JSON_END_MARKER | ===JSON_END=== | Signals the end of a JSON payload in the stream. |
FINAL_RESPONSE_MARKER | ###FINAL_RESPONSE### | Prefixes the final complete JSON response after extraction. |
Streaming Output Sequence
A typical streaming session produces events in this order:
###STREAMING###Based on the user's preferences for...
###STREAMING###lightweight and portable devices, I recommend...
###STREAMING###the following products:
###GENERATING_RESULT###
###FINAL_RESPONSE###{"recommendations":[...],"confidence":0.88,"reasoning":"..."}
Server-Side Implementation
Strategy/Lab Path (Full AI Diagnosis Process)
Uses AINativeProcessor for routing, distributed locking, and audit logging:
@RestController
public class MyAiController {
private final StandardStreamingService streamingService;
private final AINativeProcessor<MyContext> aiProcessor;
// Standard (non-streaming) response
@PostMapping("/api/analyze")
public Mono<MyResponse> analyze(@RequestBody AnalysisInput body) {
AIRequest<MyContext> request = buildRequest(body,
"MY_TEMPLATE");
return streamingService.process(
request, aiProcessor, MyResponse.class);
}
// SSE streaming response
@PostMapping(value = "/api/analyze/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> analyzeStream(
@RequestBody AnalysisInput body) {
AIRequest<MyContext> request = buildRequest(body,
"MY_TEMPLATE_STREAMING");
return streamingService.stream(request, aiProcessor);
}
}
Direct Pipeline Path (Simple LLM Calls)
Bypasses the Strategy/Lab layer for simpler use cases:
@PostMapping(value = "/api/chat/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatStream(
@RequestBody ChatInput body) {
AIRequest<ChatContext> request = buildChatRequest(body);
return streamingService.stream(
request, pipelineOrchestrator);
}
Lab Streaming Implementation
In your Lab's doProcessStream(), configure the pipeline for streaming and call orchestrator.executeStream():
@Override
protected Flux<String> doProcessStream(MyRequest request) {
AIRequest<MyContext> aiRequest =
buildAIRequest(request, "MY_TEMPLATE_STREAMING");
PipelineConfiguration config = PipelineConfiguration.builder()
.addStep(PipelineConfiguration.PipelineStep.CONTEXT_RETRIEVAL)
.addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
.addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
.addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
.enableStreaming(true)
.timeoutSeconds(120)
.build();
return orchestrator.executeStream(aiRequest, config);
}
Error Streaming
StandardStreamingService provides an error stream for validation failures:
if (body.getQuery() == null || body.getQuery().isBlank()) {
return streamingService.errorStream(
"VALIDATION_ERROR", "Query is required");
}
Client-Side Integration
The client receives SSE events and needs to handle the streaming protocol markers. Here is a complete JavaScript implementation using fetch + ReadableStream:
Streaming Client
function streamRequest(url, requestData, callbacks) {
fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(requestData)
}).then(function(response) {
if (!response.ok) throw new Error('HTTP ' + response.status);
var reader = response.body.getReader();
var decoder = new TextDecoder();
var buffer = '';
function processStream() {
reader.read().then(function(result) {
if (result.done) {
if (callbacks.onComplete) callbacks.onComplete();
return;
}
buffer += decoder.decode(result.value, { stream: true });
var lines = buffer.split('\n');
buffer = lines.pop(); // Keep incomplete line
lines.forEach(function(line) {
if (!line.startsWith('data:')) return;
var data = line.substring(5).trim();
if (!data || data === '') return;
handleStreamEvent(data, callbacks);
});
processStream(); // Continue reading
});
}
processStream();
}).catch(function(err) {
if (callbacks.onError) callbacks.onError(err.message);
});
}
function handleStreamEvent(data, callbacks) {
if (data.startsWith('###STREAMING###')) {
// Progressive text chunk - display immediately
var text = data.substring('###STREAMING###'.length);
if (callbacks.onChunk) callbacks.onChunk(text);
} else if (data.startsWith('###GENERATING_RESULT###')) {
// Analysis complete, generating structured result
if (callbacks.onGenerating) callbacks.onGenerating();
} else if (data.startsWith('###FINAL_RESPONSE###')) {
// Final structured JSON response
var json = data.substring('###FINAL_RESPONSE###'.length);
if (callbacks.onFinalResponse) callbacks.onFinalResponse(json);
} else if (data === '[DONE]') {
// Stream complete
if (callbacks.onComplete) callbacks.onComplete();
} else if (data.startsWith('ERROR:')) {
// Error event
if (callbacks.onError) callbacks.onError(data);
}
}
Usage Example
var outputEl = document.getElementById('output');
var fullText = '';
streamRequest('/api/analyze/stream', { query: 'Analyze this...' }, {
onChunk: function(text) {
// Append text progressively
fullText += text;
outputEl.textContent = fullText;
},
onGenerating: function() {
// Show "generating result..." indicator
outputEl.textContent += '\n\nGenerating structured result...';
},
onFinalResponse: function(json) {
// Parse and display structured result
try {
var result = JSON.parse(json);
displayResult(result);
} catch (e) {
outputEl.textContent = fullText; // Keep streamed text
}
},
onComplete: function() {
// Re-enable UI controls
setButtonsEnabled(true);
},
onError: function(err) {
outputEl.textContent = 'Error: ' + err;
setButtonsEnabled(true);
}
});
The OSS core documents the streaming protocol and server-side contracts, but it does not ship a bundled browser client such as
contexa-streaming.js. Implement the SSE client in your own frontend or reuse the protocol markers shown above.
Configuration
| Property | Description | Default |
|---|---|---|
contexa.streaming.final-response-marker | Marker used to locate the final structured payload in the aggregated stream | ###FINAL_RESPONSE### |
contexa.streaming.streaming-marker | Prefix used for progressive narrative text chunks | ###STREAMING### |
contexa.streaming.json-start-marker | Marker used by JSON chunk processors when raw model output starts a structured payload | ===JSON_START=== |
contexa.streaming.json-end-marker | Marker used by JSON chunk processors when raw model output ends a structured payload | ===JSON_END=== |
contexa.streaming.timeout | Maximum duration for a streaming session before timeout handling applies | PT5M |
contexa.streaming.max-retries | Maximum retry count exposed on StreamingProperties | 3 |
contexa.streaming.retry-delay | Initial retry delay exposed on StreamingProperties | PT1S |
contexa.streaming.retry-multiplier | Retry backoff multiplier exposed on StreamingProperties | 1.5 |
contexa.streaming.marker-buffer-size | Additional tail buffer retained while scanning for final-response markers | 100 |
contexa.streaming.sentence-buffering-enabled | Controls sentence-level buffering before partial chunks are emitted | true |
See AI Configuration for all streaming properties.
API Reference
StandardStreamingService
public interface StandardStreamingService
AICoreOperations.processStream().StreamingProtocol Constants
public final class StreamingProtocol
| Constant | Value |
|---|---|
STREAMING_MARKER | ###STREAMING### |
FINAL_RESPONSE_MARKER | ###FINAL_RESPONSE### |
JSON_START_MARKER | ===JSON_START=== |
JSON_END_MARKER | ===JSON_END=== |
GENERATING_RESULT_MARKER | ###GENERATING_RESULT### |
ChunkProcessor Interface
public interface ChunkProcessor
Built-in: JsonStreamingProcessor handles JSON extraction with protocol markers and includes JSON repair logic for incomplete LLM output.