contexa-core

Streaming

Contexa는 AI 응답을 위한 실시간 Server-Sent Events (SSE) Streaming을 제공합니다. Streaming 서브시스템은 원시 LLM 청크를 처리하고, 프로토콜 마커를 감지하며, 구조화된 JSON 페이로드를 추출해 Strategy/Lab 경로와 직접 Pipeline 경로 두 실행 경로를 통해 클라이언트로 전달합니다.

개요

Contexa의 Streaming은 LLM 출력 내에서 마커 기반 프로토콜을 사용하여 서술 텍스트와 구조화된 JSON 응답을 분리합니다. DefaultStandardStreamingService는 원시 LLM 청크를 처리하고, 완전한 문장으로 버퍼링하며, 프로토콜 마커를 감지하고, JSON 페이로드를 추출한 후, 클라이언트 전달을 위해 모든 것을 SSE 이벤트로 래핑합니다.

Streaming 아키텍처

LLM 또는 Pipeline에서 나온 청크가 브라우저로 전달되는 흐름은 다음과 같습니다.

LLM / Pipeline Chunks
StreamingContext
SentenceBuffer + Marker State
SSE Events
Client Browser

DefaultStandardStreamingService는 SSE 경로에서 다음 순서로 동작합니다.

  1. AICoreOperations.processStream() 또는 PipelineOrchestrator.executeStream()를 호출해 원시 청크와 프로토콜 마커가 포함된 Flux<String>를 받습니다.
  2. 각 청크를 StreamingContext에 누적하여 최종 응답 마커와 문장 버퍼링 상태를 추적합니다.
  3. 서술형 텍스트는 SentenceBuffer를 통해 점진적으로 방출됩니다.
  4. 업스트림에서 ###GENERATING_RESULT###를 내보내면 즉시 SSE 클라이언트로 전달합니다.
  5. 누적 데이터에 ###FINAL_RESPONSE###가 포함되면 구조화된 꼬리 부분을 추출해 최종 SSE 페이로드로 전송합니다.
  6. 각 항목은 ServerSentEvent로 래핑되어 브라우저에 전달됩니다.
프로세서 분리
JsonStreamingProcessor는 OSS 코어에 계속 존재하며, UnifiedLLMOrchestrator.stream()를 직접 호출할 때 DefaultStreamingHandler가 사용합니다. 반면 이 페이지의 SSE 서비스 경로는 StreamingContextSentenceBuffer를 사용해 이미 스트리밍 중인 청크를 브라우저 친화적인 이벤트로 패키징합니다.

StandardStreamingService

모든 Streaming 작업을 위한 주요 인터페이스입니다. 두 가지 실행 경로와 Streaming 및 비Streaming 모드를 모두 지원합니다.

public interface StandardStreamingService
stream(AIRequest<C> request, AICoreOperations<C> aiProcessor) Flux<ServerSentEvent<String>>
Strategy/Lab 경로의 AICoreOperations를 사용해 응답을 스트리밍합니다. AINativeProcessor가 주입된 경우 전략 라우팅, 감사, 분산 잠금 같은 상위 실행 흐름이 함께 적용될 수 있습니다.
stream(AIRequest<C> request, PipelineOrchestrator pipelineOrchestrator) Flux<ServerSentEvent<String>>
Strategy/Lab 레이어를 우회하여 Pipeline을 통해 직접 응답을 스트리밍합니다. 단순 LLM 호출이나 커스텀 프롬프트 실행에 사용하세요.
process(AIRequest<C> request, AICoreOperations<C> aiProcessor, Class<R> responseType) Mono<R>
Strategy/Lab 경로를 통한 비Streaming 단일 응답.
process(AIRequest<C> request, PipelineOrchestrator orchestrator, Class<R> responseType) Mono<R>
Pipeline 경로를 통한 비Streaming 단일 응답.
errorStream(String errorCode, String message) Flux<ServerSentEvent<String>>
JSON 형식의 오류 페이로드가 포함된 SSE 오류 스트림을 생성합니다.

StreamingProtocol

텍스트, JSON, 제어 신호를 분리하기 위해 Streaming Pipeline에서 사용되는 마커 상수를 정의합니다.

public final class StreamingProtocol
상수설명
FINAL_RESPONSE_MARKER###FINAL_RESPONSE###최종 완전한 JSON 응답의 접두사.
STREAMING_MARKER###STREAMING###클라이언트 표시를 위한 서술 텍스트 청크의 접두사.
JSON_START_MARKER===JSON_START===스트림 내 JSON 페이로드의 시작을 알림.
JSON_END_MARKER===JSON_END===스트림 내 JSON 페이로드의 끝을 알림.
GENERATING_RESULT_MARKER###GENERATING_RESULT###JSON_START가 감지되면 전송되며, 분석 텍스트가 완료되고 결과 데이터가 생성 중임을 클라이언트에 알림.

StreamingContext

Streaming 세션을 추적하는 상태 유지 컨텍스트 객체입니다. 청크를 누적하고, 마커를 감지하며, 원시 LLM 청크를 완전한 문장으로 변환하기 위한 SentenceBuffer를 관리합니다.

public class StreamingContext extends BaseStreamingContext

주요 기능:

  • 청크 누적 — 마커 감지를 위한 appendChunk().
  • 문장 버퍼링 — 전송 전 청크를 완전한 문장으로 그룹화하는 getSentenceBuffer().
  • JSON 추출 — 누적된 스트림에서 JSON 페이로드를 감지하고 추출하는 extractJsonPart().
  • 상태 추적 — 중복 발송을 방지하기 위한 isFinalResponseStarted()isJsonSent().

ChunkProcessor

Streaming 처리 체인에 연결할 수 있는 커스텀 청크 처리 로직을 위한 인터페이스입니다. SSE 서비스 경로 자체는 StreamingContextSentenceBuffer를 사용하고, 직접 UnifiedLLMOrchestrator.stream()를 호출하는 경로에서는 별도의 청크 프로세서가 사용될 수 있습니다.

public interface ChunkProcessor
process(Flux<String> upstream) Flux<String>
원시 청크의 업스트림 flux를 처리하고 변환된 출력을 반환합니다.
getProcessorType() String
이 프로세서 타입의 문자열 식별자를 반환합니다.

JsonStreamingProcessor

JsonStreamingProcessorChunkProcessor 구현체로, 스트림에서 JSON_STARTJSON_END 마커를 감지하고 그 사이의 JSON 페이로드를 누적합니다. 이 프로세서는 DefaultStreamingHandlerUnifiedLLMOrchestrator.stream()를 직접 사용할 때 활용되며, 이 페이지에서 설명하는 SSE 서비스 경로는 StreamingContextSentenceBuffer를 사용해 이미 스트리밍 중인 청크를 브라우저 친화적인 이벤트로 패키징합니다.

public class JsonStreamingProcessor implements ChunkProcessor

처리 흐름

  1. JSON_START 전 텍스트는 STREAMING_MARKER와 함께 발행됩니다.
  2. JSON_START가 감지되면 GENERATING_RESULT_MARKER가 발행됩니다.
  3. 마커 사이의 콘텐츠는 JSON 버퍼에 누적됩니다.
  4. JSON_END가 감지되면 JSON이 FINAL_RESPONSE_MARKER와 함께 발행됩니다.
  5. JSON_END 없이 스트림이 완료되면 JSON이 복구되어 발행됩니다.

코드 예제

컨트롤러에서 StandardStreamingService 사용

Java
@GetMapping(value = "/analyze/stream",
        produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> analyzeStream(
        @RequestParam String query) {

    AIRequest<SecurityContext> request = buildRequest(query);

    return streamingService.stream(request, aiNativeProcessor);
}

@PostMapping("/analyze")
public Mono<AnalysisResponse> analyze(
        @RequestBody AnalysisRequest body) {

    AIRequest<SecurityContext> request = buildRequest(body);

    return streamingService.process(
            request, aiNativeProcessor, AnalysisResponse.class);
}

직접 Pipeline Streaming

Java
@GetMapping(value = "/chat/stream",
        produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatStream(
        @RequestParam String message) {

    AIRequest<ChatContext> request = buildChatRequest(message);

    return streamingService.stream(request, pipelineOrchestrator);
}

설정

속성설명기본값
contexa.streaming.final-response-marker집계된 스트림에서 최종 구조화 페이로드를 찾을 때 사용하는 마커###FINAL_RESPONSE###
contexa.streaming.streaming-marker점진적으로 표시할 서술형 텍스트 청크에 붙는 접두사###STREAMING###
contexa.streaming.json-start-marker원시 모델 출력이 구조화된 페이로드를 시작할 때 사용하는 JSON 시작 마커===JSON_START===
contexa.streaming.json-end-marker원시 모델 출력이 구조화된 페이로드를 끝낼 때 사용하는 JSON 종료 마커===JSON_END===
contexa.streaming.timeout타임아웃 처리가 적용되기 전까지 허용하는 최대 Streaming 지속 시간PT5M
contexa.streaming.max-retriesStreamingProperties가 노출하는 최대 재시도 횟수3
contexa.streaming.retry-delay재시도 사이의 초기 대기 시간PT1S
contexa.streaming.retry-multiplier재시도 간 지수 백오프 배수2.0
contexa.streaming.marker-buffer-size마커 탐지를 위해 유지하는 버퍼 크기1000
contexa.streaming.sentence-buffering-enabledSSE 전송 전에 청크를 문장 단위로 묶을지 여부true
설정 레퍼런스
Streaming application.yml 속성은 AI 설정을 참조하세요 — StreamingProperties를 다룹니다.