in http-client/http-client-ktor/src/commonMain/kotlin/ai/koog/http/client/ktor/KtorKoogHttpClient.kt [121:186]
override fun <T : Any, R : Any, O : Any> sse(
path: String,
request: T,
requestBodyType: KClass<T>,
dataFilter: (String?) -> Boolean,
decodeStreamingResponse: (String) -> R,
processStreamingChunk: (R) -> O?,
parameters: Map<String, String>,
): Flow<O> = flow {
logger.debug { "Opening sse connection for $clientName" }
@Suppress("TooGenericExceptionCaught")
try {
ktorClient.sse(
urlString = path,
request = {
method = HttpMethod.Post
parameters.forEach { (key, value) ->
parameter(key, value)
}
accept(ContentType.Text.EventStream)
headers {
append(HttpHeaders.CacheControl, "no-cache")
append(HttpHeaders.Connection, "keep-alive")
}
if (requestBodyType == String::class) {
@Suppress("UNCHECKED_CAST")
setBody(request as String)
} else {
setBody(request, TypeInfo(requestBodyType))
}
}
) {
incoming.collect { event ->
event
.takeIf { dataFilter.invoke(it.data) }
?.data?.trim()
?.let(decodeStreamingResponse)
?.let(processStreamingChunk)
?.let { emit(it) }
}
}
} catch (e: SSEClientException) {
val errorBody = try {
e.response?.bodyAsText()
} catch (ignored: Exception) {
logger.debug(ignored) { "Unable to read SSE error response body (may already be consumed)" }
null
}
throw KoogHttpClientException(
clientName = clientName,
statusCode = e.response?.status?.value,
errorBody = errorBody,
message = e.message,
cause = e
)
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
throw KoogHttpClientException(
clientName = clientName,
message = "Exception during streaming: ${e.message}",
cause = e,
)
}
}