스트리밍
Contexa는 AI 응답에 대해 실시간 Server-Sent Events(SSE) 스트리밍을 제공합니다. 스트리밍 서브시스템은 원시 LLM 청크를 처리하고, 프로토콜 마커를 감지하며, 구조화된 JSON 페이로드를 추출하여 두 가지 실행 경로 — Strategy/Lab 경로와 직접 Pipeline 경로 — 를 통해 클라이언트에 전달합니다.
개요: 두 가지 스트리밍 경로
스트리밍은 두 가지 실행 경로를 지원하며, 둘 다 Flux<ServerSentEvent<String>>을 생성합니다:
| 경로 | 진입점 | 사용 시점 |
|---|---|---|
| Strategy/Lab | streamingService.stream(request, aiProcessor) |
라우팅, 감사, 분산 잠금이 포함된 전체 AI 진단 프로세스 |
| Direct Pipeline | streamingService.stream(request, pipelineOrchestrator) |
strategy 라우팅 없이 간단한 LLM 호출 또는 커스텀 프롬프트 |
두 경로 모두 동일한 스트리밍 프로토콜, 마커 감지, 클라이언트 측 처리를 공유합니다.
스트리밍 아키텍처
LLM에서 클라이언트 브라우저까지의 데이터 흐름:
DefaultStandardStreamingService가 SSE 경로를 오케스트레이션합니다:
AICoreOperations.processStream()또는PipelineOrchestrator.executeStream()을 호출하여 원시 청크와 프로토콜 마커의Flux<String>을 가져옵니다.- 청크를
StreamingContext에 추가하여 최종 응답 마커와 문장 버퍼링 상태를 추적합니다. - 서술형 텍스트는
SentenceBuffer를 통해 점진적으로 방출됩니다. - 업스트림이
###GENERATING_RESULT###를 방출하면, 서비스는 이를 즉시 SSE 클라이언트로 전달합니다. - 누적된 데이터에
###FINAL_RESPONSE###가 포함되면, 서비스는 그 구조화된 꼬리를 추출하여 최종 SSE 페이로드로 방출합니다. - 각 출력 항목은 전달을 위해
ServerSentEvent로 래핑됩니다.
JsonStreamingProcessor는 OSS 코어에 계속 존재하며, UnifiedLLMOrchestrator.stream()이 직접 호출될 때 DefaultStreamingHandler가 사용합니다. 반면 이 페이지의 SSE 서비스 경로는 StreamingContext와 SentenceBuffer를 사용해 이미 스트리밍 중인 청크 흐름을 브라우저 친화적 이벤트로 패키징합니다.
스트리밍 프로토콜 및 마커
스트리밍 파이프라인은 LLM 출력 내에서 마커 기반 프로토콜을 사용하여 서술 텍스트와 구조화된 JSON을 분리합니다:
| 상수 | 값 | 설명 |
|---|---|---|
STREAMING_MARKER | ###STREAMING### | 점진적 클라이언트 표시를 위한 서술 텍스트 청크의 접두사. |
GENERATING_RESULT_MARKER | ###GENERATING_RESULT### | JSON_START 감지 시 전송. 분석 텍스트가 완료되고 결과 생성이 시작됨을 알립니다. |
JSON_START_MARKER | ===JSON_START=== | 스트림 내 JSON 페이로드 시작 신호. |
JSON_END_MARKER | ===JSON_END=== | 스트림 내 JSON 페이로드 종료 신호. |
FINAL_RESPONSE_MARKER | ###FINAL_RESPONSE### | 추출 후 최종 완전한 JSON 응답의 접두사. |
스트리밍 출력 시퀀스
일반적 스트리밍 세션은 다음 순서로 이벤트를 생성합니다:
###STREAMING###Based on the user's preferences for...
###STREAMING###lightweight and portable devices, I recommend...
###STREAMING###the following products:
###GENERATING_RESULT###
###FINAL_RESPONSE###{"recommendations":[...],"confidence":0.88,"reasoning":"..."}
서버 측 구현
Strategy/Lab 경로 (전체 AI 진단 프로세스)
라우팅, 분산 잠금, 감사 로깅에 AINativeProcessor를 사용합니다:
@RestController
public class MyAiController {
private final StandardStreamingService streamingService;
private final AINativeProcessor<MyContext> aiProcessor;
// Standard (non-streaming) response
@PostMapping("/api/analyze")
public Mono<MyResponse> analyze(@RequestBody AnalysisInput body) {
AIRequest<MyContext> request = buildRequest(body,
"MY_TEMPLATE");
return streamingService.process(
request, aiProcessor, MyResponse.class);
}
// SSE streaming response
@PostMapping(value = "/api/analyze/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> analyzeStream(
@RequestBody AnalysisInput body) {
AIRequest<MyContext> request = buildRequest(body,
"MY_TEMPLATE_STREAMING");
return streamingService.stream(request, aiProcessor);
}
}
Direct Pipeline 경로 (간단한 LLM 호출)
더 간단한 사용 사례를 위해 Strategy/Lab 계층을 우회합니다:
@PostMapping(value = "/api/chat/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatStream(
@RequestBody ChatInput body) {
AIRequest<ChatContext> request = buildChatRequest(body);
return streamingService.stream(
request, pipelineOrchestrator);
}
Lab 스트리밍 구현
Lab의 doProcessStream()에서 스트리밍용으로 pipeline을 구성하고 orchestrator.executeStream()을 호출합니다:
@Override
protected Flux<String> doProcessStream(MyRequest request) {
AIRequest<MyContext> aiRequest =
buildAIRequest(request, "MY_TEMPLATE_STREAMING");
PipelineConfiguration config = PipelineConfiguration.builder()
.addStep(PipelineConfiguration.PipelineStep.CONTEXT_RETRIEVAL)
.addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
.addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
.addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
.enableStreaming(true)
.timeoutSeconds(120)
.build();
return orchestrator.executeStream(aiRequest, config);
}
오류 스트리밍
StandardStreamingService는 검증 실패에 대한 오류 스트림을 제공합니다:
if (body.getQuery() == null || body.getQuery().isBlank()) {
return streamingService.errorStream(
"VALIDATION_ERROR", "Query is required");
}
클라이언트 측 통합
클라이언트는 SSE 이벤트를 수신하며 스트리밍 프로토콜 마커를 처리해야 합니다. 다음은 fetch와 ReadableStream을 사용하는 완전한 JavaScript 구현입니다:
스트리밍 클라이언트
function streamRequest(url, requestData, callbacks) {
fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(requestData)
}).then(function(response) {
if (!response.ok) throw new Error('HTTP ' + response.status);
var reader = response.body.getReader();
var decoder = new TextDecoder();
var buffer = '';
function processStream() {
reader.read().then(function(result) {
if (result.done) {
if (callbacks.onComplete) callbacks.onComplete();
return;
}
buffer += decoder.decode(result.value, { stream: true });
var lines = buffer.split('\n');
buffer = lines.pop(); // Keep incomplete line
lines.forEach(function(line) {
if (!line.startsWith('data:')) return;
var data = line.substring(5).trim();
if (!data || data === '') return;
handleStreamEvent(data, callbacks);
});
processStream(); // Continue reading
});
}
processStream();
}).catch(function(err) {
if (callbacks.onError) callbacks.onError(err.message);
});
}
function handleStreamEvent(data, callbacks) {
if (data.startsWith('###STREAMING###')) {
// Progressive text chunk - display immediately
var text = data.substring('###STREAMING###'.length);
if (callbacks.onChunk) callbacks.onChunk(text);
} else if (data.startsWith('###GENERATING_RESULT###')) {
// Analysis complete, generating structured result
if (callbacks.onGenerating) callbacks.onGenerating();
} else if (data.startsWith('###FINAL_RESPONSE###')) {
// Final structured JSON response
var json = data.substring('###FINAL_RESPONSE###'.length);
if (callbacks.onFinalResponse) callbacks.onFinalResponse(json);
} else if (data === '[DONE]') {
// Stream complete
if (callbacks.onComplete) callbacks.onComplete();
} else if (data.startsWith('ERROR:')) {
// Error event
if (callbacks.onError) callbacks.onError(data);
}
}
사용 예제
var outputEl = document.getElementById('output');
var fullText = '';
streamRequest('/api/analyze/stream', { query: 'Analyze this...' }, {
onChunk: function(text) {
// Append text progressively
fullText += text;
outputEl.textContent = fullText;
},
onGenerating: function() {
// Show "generating result..." indicator
outputEl.textContent += '\n\nGenerating structured result...';
},
onFinalResponse: function(json) {
// Parse and display structured result
try {
var result = JSON.parse(json);
displayResult(result);
} catch (e) {
outputEl.textContent = fullText; // Keep streamed text
}
},
onComplete: function() {
// Re-enable UI controls
setButtonsEnabled(true);
},
onError: function(err) {
outputEl.textContent = 'Error: ' + err;
setButtonsEnabled(true);
}
});
OSS 코어는 스트리밍 프로토콜과 서버 측 계약을 문서화하지만,
contexa-streaming.js 같은 번들 브라우저 클라이언트는 제공하지 않습니다. 자체 프론트엔드에서 SSE 클라이언트를 구현하거나 위에 표시된 프로토콜 마커를 재사용하세요.
설정
| 속성 | 설명 | 기본값 |
|---|---|---|
contexa.streaming.final-response-marker | 집계 스트림에서 최종 구조화 페이로드를 찾기 위한 마커 | ###FINAL_RESPONSE### |
contexa.streaming.streaming-marker | 점진적 서술 텍스트 청크에 사용되는 접두사 | ###STREAMING### |
contexa.streaming.json-start-marker | 원시 모델 출력이 구조화 페이로드를 시작할 때 JSON 청크 프로세서가 사용하는 마커 | ===JSON_START=== |
contexa.streaming.json-end-marker | 원시 모델 출력이 구조화 페이로드를 종료할 때 JSON 청크 프로세서가 사용하는 마커 | ===JSON_END=== |
contexa.streaming.timeout | 타임아웃 처리가 적용되기 전 스트리밍 세션의 최대 지속 시간 | PT5M |
contexa.streaming.max-retries | StreamingProperties에 노출된 최대 재시도 횟수 | 3 |
contexa.streaming.retry-delay | StreamingProperties에 노출된 초기 재시도 지연 | PT1S |
contexa.streaming.retry-multiplier | StreamingProperties에 노출된 재시도 백오프 배율 | 1.5 |
contexa.streaming.marker-buffer-size | 최종 응답 마커를 스캔하는 동안 유지되는 추가 꼬리 버퍼 | 100 |
contexa.streaming.sentence-buffering-enabled | 부분 청크 방출 전 문장 수준 버퍼링 제어 | true |
모든 스트리밍 속성은 AI 설정을 참조하세요.
API 레퍼런스
StandardStreamingService
public interface StandardStreamingService
AICoreOperations.processStream()을 통한 Strategy/Lab 스트리밍 경로.StreamingProtocol 상수
public final class StreamingProtocol
| 상수 | 값 |
|---|---|
STREAMING_MARKER | ###STREAMING### |
FINAL_RESPONSE_MARKER | ###FINAL_RESPONSE### |
JSON_START_MARKER | ===JSON_START=== |
JSON_END_MARKER | ===JSON_END=== |
GENERATING_RESULT_MARKER | ###GENERATING_RESULT### |
ChunkProcessor 인터페이스
public interface ChunkProcessor
내장: JsonStreamingProcessor는 프로토콜 마커를 사용한 JSON 추출과 불완전한 LLM 출력을 위한 JSON 복구 로직을 처리합니다.