in http-client/http-client-java/src/main/kotlin/ai/koog/http/client/java/JavaKoogHttpClient.kt [120:204]
override fun <T : Any, R : Any, O : Any> sse(
path: String,
request: T,
requestBodyType: KClass<T>,
dataFilter: (String?) -> Boolean,
decodeStreamingResponse: (String) -> R,
processStreamingChunk: (R) -> O?,
parameters: Map<String, String>
): Flow<O> = callbackFlow {
val requestBody = prepareRequestBody(request, requestBodyType)
val httpRequest = HttpRequest.newBuilder()
.uri(buildUri(path, parameters))
.POST(HttpRequest.BodyPublishers.ofString(requestBody.body))
.header("Content-Type", requestBody.contentType)
.header("Accept", "text/event-stream")
.header("Cache-Control", "no-cache")
// Note: "Connection" header is restricted in Java HttpClient and managed automatically
.build()
try {
val response = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofLines())
if (response.statusCode() !in 200..299) {
close(
KoogHttpClientException(
clientName = clientName,
statusCode = response.statusCode(),
)
)
return@callbackFlow
}
logger.debug { "SSE connection opened for $clientName" }
// Process the stream of lines
response.body().forEach { line ->
try {
val dataPrefix = "data: "
// SSE format: "data: <content>"
val data = if (line.startsWith(dataPrefix)) {
line.substring(dataPrefix.length)
} else if (line.isNotEmpty() && !line.startsWith(":")) {
line
} else {
null
}
if (data != null && dataFilter(data)) {
data.trim()
.let(decodeStreamingResponse)
.let(processStreamingChunk)
?.let { trySend(it) }
}
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
close(
KoogHttpClientException(
clientName = clientName,
message = "Error processing SSE event: ${e.message}",
cause = e
)
)
}
}
logger.debug { "SSE connection closed for $clientName" }
close()
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
close(
KoogHttpClientException(
clientName = clientName,
message = "Exception during streaming: ${e.message}",
cause = e
)
)
}
awaitClose {
// Cleanup if needed
}
}