contexa-core

스트리밍

Contexa는 AI 응답에 대해 실시간 Server-Sent Events(SSE) 스트리밍을 제공합니다. 스트리밍 서브시스템은 원시 LLM 청크를 처리하고, 프로토콜 마커를 감지하며, 구조화된 JSON 페이로드를 추출하여 두 가지 실행 경로 — Strategy/Lab 경로와 직접 Pipeline 경로 — 를 통해 클라이언트에 전달합니다.

개요: 두 가지 스트리밍 경로

스트리밍은 두 가지 실행 경로를 지원하며, 둘 다 Flux<ServerSentEvent<String>>을 생성합니다:

경로진입점사용 시점
Strategy/Lab streamingService.stream(request, aiProcessor) 라우팅, 감사, 분산 잠금이 포함된 전체 AI 진단 프로세스
Direct Pipeline streamingService.stream(request, pipelineOrchestrator) strategy 라우팅 없이 간단한 LLM 호출 또는 커스텀 프롬프트

두 경로 모두 동일한 스트리밍 프로토콜, 마커 감지, 클라이언트 측 처리를 공유합니다.

스트리밍 아키텍처

LLM에서 클라이언트 브라우저까지의 데이터 흐름:

LLM / Pipeline 청크
StreamingContext
SentenceBuffer + 마커 상태
SSE 이벤트
클라이언트 브라우저

DefaultStandardStreamingService가 SSE 경로를 오케스트레이션합니다:

  1. AICoreOperations.processStream() 또는 PipelineOrchestrator.executeStream()을 호출하여 원시 청크와 프로토콜 마커의 Flux<String>을 가져옵니다.
  2. 청크를 StreamingContext에 추가하여 최종 응답 마커와 문장 버퍼링 상태를 추적합니다.
  3. 서술형 텍스트는 SentenceBuffer를 통해 점진적으로 방출됩니다.
  4. 업스트림이 ###GENERATING_RESULT###를 방출하면, 서비스는 이를 즉시 SSE 클라이언트로 전달합니다.
  5. 누적된 데이터에 ###FINAL_RESPONSE###가 포함되면, 서비스는 그 구조화된 꼬리를 추출하여 최종 SSE 페이로드로 방출합니다.
  6. 각 출력 항목은 전달을 위해 ServerSentEvent로 래핑됩니다.
프로세서 분리
JsonStreamingProcessor는 OSS 코어에 계속 존재하며, UnifiedLLMOrchestrator.stream()이 직접 호출될 때 DefaultStreamingHandler가 사용합니다. 반면 이 페이지의 SSE 서비스 경로는 StreamingContextSentenceBuffer를 사용해 이미 스트리밍 중인 청크 흐름을 브라우저 친화적 이벤트로 패키징합니다.

스트리밍 프로토콜 및 마커

스트리밍 파이프라인은 LLM 출력 내에서 마커 기반 프로토콜을 사용하여 서술 텍스트와 구조화된 JSON을 분리합니다:

상수설명
STREAMING_MARKER###STREAMING###점진적 클라이언트 표시를 위한 서술 텍스트 청크의 접두사.
GENERATING_RESULT_MARKER###GENERATING_RESULT###JSON_START 감지 시 전송. 분석 텍스트가 완료되고 결과 생성이 시작됨을 알립니다.
JSON_START_MARKER===JSON_START===스트림 내 JSON 페이로드 시작 신호.
JSON_END_MARKER===JSON_END===스트림 내 JSON 페이로드 종료 신호.
FINAL_RESPONSE_MARKER###FINAL_RESPONSE###추출 후 최종 완전한 JSON 응답의 접두사.

스트리밍 출력 시퀀스

일반적 스트리밍 세션은 다음 순서로 이벤트를 생성합니다:

SSE Events
###STREAMING###Based on the user's preferences for...
###STREAMING###lightweight and portable devices, I recommend...
###STREAMING###the following products:
###GENERATING_RESULT###
###FINAL_RESPONSE###{"recommendations":[...],"confidence":0.88,"reasoning":"..."}

서버 측 구현

Strategy/Lab 경로 (전체 AI 진단 프로세스)

라우팅, 분산 잠금, 감사 로깅에 AINativeProcessor를 사용합니다:

Java
@RestController
public class MyAiController {

    private final StandardStreamingService streamingService;
    private final AINativeProcessor<MyContext> aiProcessor;

    // Standard (non-streaming) response
    @PostMapping("/api/analyze")
    public Mono<MyResponse> analyze(@RequestBody AnalysisInput body) {
        AIRequest<MyContext> request = buildRequest(body,
            "MY_TEMPLATE");
        return streamingService.process(
            request, aiProcessor, MyResponse.class);
    }

    // SSE streaming response
    @PostMapping(value = "/api/analyze/stream",
        produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> analyzeStream(
            @RequestBody AnalysisInput body) {
        AIRequest<MyContext> request = buildRequest(body,
            "MY_TEMPLATE_STREAMING");
        return streamingService.stream(request, aiProcessor);
    }
}

Direct Pipeline 경로 (간단한 LLM 호출)

더 간단한 사용 사례를 위해 Strategy/Lab 계층을 우회합니다:

Java
@PostMapping(value = "/api/chat/stream",
    produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatStream(
        @RequestBody ChatInput body) {
    AIRequest<ChatContext> request = buildChatRequest(body);
    return streamingService.stream(
        request, pipelineOrchestrator);
}

Lab 스트리밍 구현

Lab의 doProcessStream()에서 스트리밍용으로 pipeline을 구성하고 orchestrator.executeStream()을 호출합니다:

Java
@Override
protected Flux<String> doProcessStream(MyRequest request) {
    AIRequest<MyContext> aiRequest =
        buildAIRequest(request, "MY_TEMPLATE_STREAMING");

    PipelineConfiguration config = PipelineConfiguration.builder()
        .addStep(PipelineConfiguration.PipelineStep.CONTEXT_RETRIEVAL)
        .addStep(PipelineConfiguration.PipelineStep.PREPROCESSING)
        .addStep(PipelineConfiguration.PipelineStep.PROMPT_GENERATION)
        .addStep(PipelineConfiguration.PipelineStep.LLM_EXECUTION)
        .enableStreaming(true)
        .timeoutSeconds(120)
        .build();

    return orchestrator.executeStream(aiRequest, config);
}

오류 스트리밍

StandardStreamingService는 검증 실패에 대한 오류 스트림을 제공합니다:

Java
if (body.getQuery() == null || body.getQuery().isBlank()) {
    return streamingService.errorStream(
        "VALIDATION_ERROR", "Query is required");
}

클라이언트 측 통합

클라이언트는 SSE 이벤트를 수신하며 스트리밍 프로토콜 마커를 처리해야 합니다. 다음은 fetchReadableStream을 사용하는 완전한 JavaScript 구현입니다:

스트리밍 클라이언트

JavaScript
function streamRequest(url, requestData, callbacks) {
    fetch(url, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify(requestData)
    }).then(function(response) {
        if (!response.ok) throw new Error('HTTP ' + response.status);

        var reader = response.body.getReader();
        var decoder = new TextDecoder();
        var buffer = '';

        function processStream() {
            reader.read().then(function(result) {
                if (result.done) {
                    if (callbacks.onComplete) callbacks.onComplete();
                    return;
                }

                buffer += decoder.decode(result.value, { stream: true });
                var lines = buffer.split('\n');
                buffer = lines.pop(); // Keep incomplete line

                lines.forEach(function(line) {
                    if (!line.startsWith('data:')) return;
                    var data = line.substring(5).trim();
                    if (!data || data === '') return;

                    handleStreamEvent(data, callbacks);
                });

                processStream(); // Continue reading
            });
        }

        processStream();
    }).catch(function(err) {
        if (callbacks.onError) callbacks.onError(err.message);
    });
}

function handleStreamEvent(data, callbacks) {
    if (data.startsWith('###STREAMING###')) {
        // Progressive text chunk - display immediately
        var text = data.substring('###STREAMING###'.length);
        if (callbacks.onChunk) callbacks.onChunk(text);

    } else if (data.startsWith('###GENERATING_RESULT###')) {
        // Analysis complete, generating structured result
        if (callbacks.onGenerating) callbacks.onGenerating();

    } else if (data.startsWith('###FINAL_RESPONSE###')) {
        // Final structured JSON response
        var json = data.substring('###FINAL_RESPONSE###'.length);
        if (callbacks.onFinalResponse) callbacks.onFinalResponse(json);

    } else if (data === '[DONE]') {
        // Stream complete
        if (callbacks.onComplete) callbacks.onComplete();

    } else if (data.startsWith('ERROR:')) {
        // Error event
        if (callbacks.onError) callbacks.onError(data);
    }
}

사용 예제

JavaScript
var outputEl = document.getElementById('output');
var fullText = '';

streamRequest('/api/analyze/stream', { query: 'Analyze this...' }, {
    onChunk: function(text) {
        // Append text progressively
        fullText += text;
        outputEl.textContent = fullText;
    },
    onGenerating: function() {
        // Show "generating result..." indicator
        outputEl.textContent += '\n\nGenerating structured result...';
    },
    onFinalResponse: function(json) {
        // Parse and display structured result
        try {
            var result = JSON.parse(json);
            displayResult(result);
        } catch (e) {
            outputEl.textContent = fullText; // Keep streamed text
        }
    },
    onComplete: function() {
        // Re-enable UI controls
        setButtonsEnabled(true);
    },
    onError: function(err) {
        outputEl.textContent = 'Error: ' + err;
        setButtonsEnabled(true);
    }
});
OSS 클라이언트 범위
OSS 코어는 스트리밍 프로토콜과 서버 측 계약을 문서화하지만, contexa-streaming.js 같은 번들 브라우저 클라이언트는 제공하지 않습니다. 자체 프론트엔드에서 SSE 클라이언트를 구현하거나 위에 표시된 프로토콜 마커를 재사용하세요.

설정

속성설명기본값
contexa.streaming.final-response-marker집계 스트림에서 최종 구조화 페이로드를 찾기 위한 마커###FINAL_RESPONSE###
contexa.streaming.streaming-marker점진적 서술 텍스트 청크에 사용되는 접두사###STREAMING###
contexa.streaming.json-start-marker원시 모델 출력이 구조화 페이로드를 시작할 때 JSON 청크 프로세서가 사용하는 마커===JSON_START===
contexa.streaming.json-end-marker원시 모델 출력이 구조화 페이로드를 종료할 때 JSON 청크 프로세서가 사용하는 마커===JSON_END===
contexa.streaming.timeout타임아웃 처리가 적용되기 전 스트리밍 세션의 최대 지속 시간PT5M
contexa.streaming.max-retriesStreamingProperties에 노출된 최대 재시도 횟수3
contexa.streaming.retry-delayStreamingProperties에 노출된 초기 재시도 지연PT1S
contexa.streaming.retry-multiplierStreamingProperties에 노출된 재시도 백오프 배율1.5
contexa.streaming.marker-buffer-size최종 응답 마커를 스캔하는 동안 유지되는 추가 꼬리 버퍼100
contexa.streaming.sentence-buffering-enabled부분 청크 방출 전 문장 수준 버퍼링 제어true
전체 설정 레퍼런스
모든 스트리밍 속성은 AI 설정을 참조하세요.

API 레퍼런스

StandardStreamingService
public interface StandardStreamingService
stream(AIRequest<C> request, AICoreOperations<C> aiProcessor) Flux<ServerSentEvent<String>>
AICoreOperations.processStream()을 통한 Strategy/Lab 스트리밍 경로.
stream(AIRequest<C> request, PipelineOrchestrator pipelineOrchestrator) Flux<ServerSentEvent<String>>
직접 pipeline 스트리밍 경로.
process(AIRequest<C> request, AICoreOperations<C> aiProcessor, Class<R> responseType) Mono<R>
Strategy/Lab 경로를 통한 비스트리밍 단일 응답.
process(AIRequest<C> request, PipelineOrchestrator orchestrator, Class<R> responseType) Mono<R>
Pipeline 경로를 통한 비스트리밍 단일 응답.
errorStream(String errorCode, String message) Flux<ServerSentEvent<String>>
JSON 오류 페이로드가 포함된 SSE 오류 스트림을 생성합니다.
StreamingProtocol 상수
public final class StreamingProtocol
상수
STREAMING_MARKER###STREAMING###
FINAL_RESPONSE_MARKER###FINAL_RESPONSE###
JSON_START_MARKER===JSON_START===
JSON_END_MARKER===JSON_END===
GENERATING_RESULT_MARKER###GENERATING_RESULT###
ChunkProcessor 인터페이스
public interface ChunkProcessor
process(Flux<String> upstream) Flux<String>
원시 청크의 업스트림 flux를 처리하고 변환된 출력을 반환합니다.
getProcessorType() String
이 프로세서의 문자열 식별자를 반환합니다.

내장: JsonStreamingProcessor는 프로토콜 마커를 사용한 JSON 추출과 불완전한 LLM 출력을 위한 JSON 복구 로직을 처리합니다.