override fun sse()

in http-client/http-client-ktor/src/commonMain/kotlin/ai/koog/http/client/ktor/KtorKoogHttpClient.kt [121:186]


    override fun <T : Any, R : Any, O : Any> sse(
        path: String,
        request: T,
        requestBodyType: KClass<T>,
        dataFilter: (String?) -> Boolean,
        decodeStreamingResponse: (String) -> R,
        processStreamingChunk: (R) -> O?,
        parameters: Map<String, String>,
    ): Flow<O> = flow {
        logger.debug { "Opening sse connection for $clientName" }

        @Suppress("TooGenericExceptionCaught")
        try {
            ktorClient.sse(
                urlString = path,
                request = {
                    method = HttpMethod.Post
                    parameters.forEach { (key, value) ->
                        parameter(key, value)
                    }
                    accept(ContentType.Text.EventStream)
                    headers {
                        append(HttpHeaders.CacheControl, "no-cache")
                        append(HttpHeaders.Connection, "keep-alive")
                    }
                    if (requestBodyType == String::class) {
                        @Suppress("UNCHECKED_CAST")
                        setBody(request as String)
                    } else {
                        setBody(request, TypeInfo(requestBodyType))
                    }
                }
            ) {
                incoming.collect { event ->
                    event
                        .takeIf { dataFilter.invoke(it.data) }
                        ?.data?.trim()
                        ?.let(decodeStreamingResponse)
                        ?.let(processStreamingChunk)
                        ?.let { emit(it) }
                }
            }
        } catch (e: SSEClientException) {
            val errorBody = try {
                e.response?.bodyAsText()
            } catch (ignored: Exception) {
                logger.debug(ignored) { "Unable to read SSE error response body (may already be consumed)" }
                null
            }
            throw KoogHttpClientException(
                clientName = clientName,
                statusCode = e.response?.status?.value,
                errorBody = errorBody,
                message = e.message,
                cause = e
            )
        } catch (e: CancellationException) {
            throw e
        } catch (e: Exception) {
            throw KoogHttpClientException(
                clientName = clientName,
                message = "Exception during streaming: ${e.message}",
                cause = e,
            )
        }
    }