Streaming
Contexa는 AI 응답을 위한 실시간 Server-Sent Events (SSE) Streaming을 제공합니다. Streaming 서브시스템은 원시 LLM 청크를 처리하고, 프로토콜 마커를 감지하며, 구조화된 JSON 페이로드를 추출해 Strategy/Lab 경로와 직접 Pipeline 경로 두 실행 경로를 통해 클라이언트로 전달합니다.
개요
Contexa의 Streaming은 LLM 출력 내에서 마커 기반 프로토콜을 사용하여 서술 텍스트와 구조화된 JSON 응답을 분리합니다. DefaultStandardStreamingService는 원시 LLM 청크를 처리하고, 완전한 문장으로 버퍼링하며, 프로토콜 마커를 감지하고, JSON 페이로드를 추출한 후, 클라이언트 전달을 위해 모든 것을 SSE 이벤트로 래핑합니다.
Streaming 아키텍처
LLM 또는 Pipeline에서 나온 청크가 브라우저로 전달되는 흐름은 다음과 같습니다.
DefaultStandardStreamingService는 SSE 경로에서 다음 순서로 동작합니다.
AICoreOperations.processStream()또는PipelineOrchestrator.executeStream()를 호출해 원시 청크와 프로토콜 마커가 포함된Flux<String>를 받습니다.- 각 청크를
StreamingContext에 누적하여 최종 응답 마커와 문장 버퍼링 상태를 추적합니다. - 서술형 텍스트는
SentenceBuffer를 통해 점진적으로 방출됩니다. - 업스트림에서
###GENERATING_RESULT###를 내보내면 즉시 SSE 클라이언트로 전달합니다. - 누적 데이터에
###FINAL_RESPONSE###가 포함되면 구조화된 꼬리 부분을 추출해 최종 SSE 페이로드로 전송합니다. - 각 항목은
ServerSentEvent로 래핑되어 브라우저에 전달됩니다.
JsonStreamingProcessor는 OSS 코어에 계속 존재하며, UnifiedLLMOrchestrator.stream()를 직접 호출할 때 DefaultStreamingHandler가 사용합니다. 반면 이 페이지의 SSE 서비스 경로는 StreamingContext와 SentenceBuffer를 사용해 이미 스트리밍 중인 청크를 브라우저 친화적인 이벤트로 패키징합니다.
StandardStreamingService
모든 Streaming 작업을 위한 주요 인터페이스입니다. 두 가지 실행 경로와 Streaming 및 비Streaming 모드를 모두 지원합니다.
public interface StandardStreamingService
AICoreOperations를 사용해 응답을 스트리밍합니다. AINativeProcessor가 주입된 경우 전략 라우팅, 감사, 분산 잠금 같은 상위 실행 흐름이 함께 적용될 수 있습니다.StreamingProtocol
텍스트, JSON, 제어 신호를 분리하기 위해 Streaming Pipeline에서 사용되는 마커 상수를 정의합니다.
public final class StreamingProtocol
| 상수 | 값 | 설명 |
|---|---|---|
FINAL_RESPONSE_MARKER | ###FINAL_RESPONSE### | 최종 완전한 JSON 응답의 접두사. |
STREAMING_MARKER | ###STREAMING### | 클라이언트 표시를 위한 서술 텍스트 청크의 접두사. |
JSON_START_MARKER | ===JSON_START=== | 스트림 내 JSON 페이로드의 시작을 알림. |
JSON_END_MARKER | ===JSON_END=== | 스트림 내 JSON 페이로드의 끝을 알림. |
GENERATING_RESULT_MARKER | ###GENERATING_RESULT### | JSON_START가 감지되면 전송되며, 분석 텍스트가 완료되고 결과 데이터가 생성 중임을 클라이언트에 알림. |
StreamingContext
Streaming 세션을 추적하는 상태 유지 컨텍스트 객체입니다. 청크를 누적하고, 마커를 감지하며, 원시 LLM 청크를 완전한 문장으로 변환하기 위한 SentenceBuffer를 관리합니다.
public class StreamingContext extends BaseStreamingContext
주요 기능:
- 청크 누적 — 마커 감지를 위한
appendChunk(). - 문장 버퍼링 — 전송 전 청크를 완전한 문장으로 그룹화하는
getSentenceBuffer(). - JSON 추출 — 누적된 스트림에서 JSON 페이로드를 감지하고 추출하는
extractJsonPart(). - 상태 추적 — 중복 발송을 방지하기 위한
isFinalResponseStarted()및isJsonSent().
ChunkProcessor
Streaming 처리 체인에 연결할 수 있는 커스텀 청크 처리 로직을 위한 인터페이스입니다. SSE 서비스 경로 자체는 StreamingContext와 SentenceBuffer를 사용하고, 직접 UnifiedLLMOrchestrator.stream()를 호출하는 경로에서는 별도의 청크 프로세서가 사용될 수 있습니다.
public interface ChunkProcessor
JsonStreamingProcessor
JsonStreamingProcessor는 ChunkProcessor 구현체로, 스트림에서 JSON_START와 JSON_END 마커를 감지하고 그 사이의 JSON 페이로드를 누적합니다. 이 프로세서는 DefaultStreamingHandler가 UnifiedLLMOrchestrator.stream()를 직접 사용할 때 활용되며, 이 페이지에서 설명하는 SSE 서비스 경로는 StreamingContext와 SentenceBuffer를 사용해 이미 스트리밍 중인 청크를 브라우저 친화적인 이벤트로 패키징합니다.
public class JsonStreamingProcessor implements ChunkProcessor
처리 흐름
JSON_START전 텍스트는STREAMING_MARKER와 함께 발행됩니다.JSON_START가 감지되면GENERATING_RESULT_MARKER가 발행됩니다.- 마커 사이의 콘텐츠는 JSON 버퍼에 누적됩니다.
JSON_END가 감지되면 JSON이FINAL_RESPONSE_MARKER와 함께 발행됩니다.JSON_END없이 스트림이 완료되면 JSON이 복구되어 발행됩니다.
코드 예제
컨트롤러에서 StandardStreamingService 사용
@GetMapping(value = "/analyze/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> analyzeStream(
@RequestParam String query) {
AIRequest<SecurityContext> request = buildRequest(query);
return streamingService.stream(request, aiNativeProcessor);
}
@PostMapping("/analyze")
public Mono<AnalysisResponse> analyze(
@RequestBody AnalysisRequest body) {
AIRequest<SecurityContext> request = buildRequest(body);
return streamingService.process(
request, aiNativeProcessor, AnalysisResponse.class);
}
직접 Pipeline Streaming
@GetMapping(value = "/chat/stream",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatStream(
@RequestParam String message) {
AIRequest<ChatContext> request = buildChatRequest(message);
return streamingService.stream(request, pipelineOrchestrator);
}
관련 문서
설정
| 속성 | 설명 | 기본값 |
|---|---|---|
contexa.streaming.final-response-marker | 집계된 스트림에서 최종 구조화 페이로드를 찾을 때 사용하는 마커 | ###FINAL_RESPONSE### |
contexa.streaming.streaming-marker | 점진적으로 표시할 서술형 텍스트 청크에 붙는 접두사 | ###STREAMING### |
contexa.streaming.json-start-marker | 원시 모델 출력이 구조화된 페이로드를 시작할 때 사용하는 JSON 시작 마커 | ===JSON_START=== |
contexa.streaming.json-end-marker | 원시 모델 출력이 구조화된 페이로드를 끝낼 때 사용하는 JSON 종료 마커 | ===JSON_END=== |
contexa.streaming.timeout | 타임아웃 처리가 적용되기 전까지 허용하는 최대 Streaming 지속 시간 | PT5M |
contexa.streaming.max-retries | StreamingProperties가 노출하는 최대 재시도 횟수 | 3 |
contexa.streaming.retry-delay | 재시도 사이의 초기 대기 시간 | PT1S |
contexa.streaming.retry-multiplier | 재시도 간 지수 백오프 배수 | 2.0 |
contexa.streaming.marker-buffer-size | 마커 탐지를 위해 유지하는 버퍼 크기 | 1000 |
contexa.streaming.sentence-buffering-enabled | SSE 전송 전에 청크를 문장 단위로 묶을지 여부 | true |