override fun sse()

in http-client/http-client-java/src/main/kotlin/ai/koog/http/client/java/JavaKoogHttpClient.kt [120:204]


    override fun <T : Any, R : Any, O : Any> sse(
        path: String,
        request: T,
        requestBodyType: KClass<T>,
        dataFilter: (String?) -> Boolean,
        decodeStreamingResponse: (String) -> R,
        processStreamingChunk: (R) -> O?,
        parameters: Map<String, String>
    ): Flow<O> = callbackFlow {
        val requestBody = prepareRequestBody(request, requestBodyType)

        val httpRequest = HttpRequest.newBuilder()
            .uri(buildUri(path, parameters))
            .POST(HttpRequest.BodyPublishers.ofString(requestBody.body))
            .header("Content-Type", requestBody.contentType)
            .header("Accept", "text/event-stream")
            .header("Cache-Control", "no-cache")
            // Note: "Connection" header is restricted in Java HttpClient and managed automatically
            .build()

        try {
            val response = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofLines())

            if (response.statusCode() !in 200..299) {
                close(
                    KoogHttpClientException(
                        clientName = clientName,
                        statusCode = response.statusCode(),
                    )
                )
                return@callbackFlow
            }

            logger.debug { "SSE connection opened for $clientName" }

            // Process the stream of lines
            response.body().forEach { line ->
                try {
                    val dataPrefix = "data: "
                    // SSE format: "data: <content>"
                    val data = if (line.startsWith(dataPrefix)) {
                        line.substring(dataPrefix.length)
                    } else if (line.isNotEmpty() && !line.startsWith(":")) {
                        line
                    } else {
                        null
                    }

                    if (data != null && dataFilter(data)) {
                        data.trim()
                            .let(decodeStreamingResponse)
                            .let(processStreamingChunk)
                            ?.let { trySend(it) }
                    }
                } catch (e: CancellationException) {
                    throw e
                } catch (e: Exception) {
                    close(
                        KoogHttpClientException(
                            clientName = clientName,
                            message = "Error processing SSE event: ${e.message}",
                            cause = e
                        )
                    )
                }
            }

            logger.debug { "SSE connection closed for $clientName" }
            close()
        } catch (e: CancellationException) {
            throw e
        } catch (e: Exception) {
            close(
                KoogHttpClientException(
                    clientName = clientName,
                    message = "Exception during streaming: ${e.message}",
                    cause = e
                )
            )
        }

        awaitClose {
            // Cleanup if needed
        }
    }