override fun executeStreaming()

in prompt/prompt-executor/prompt-executor-clients/prompt-executor-bedrock-client/src/jvmMain/kotlin/ai/koog/prompt/executor/clients/bedrock/BedrockLLMClient.kt [277:354]


    override fun executeStreaming(
        prompt: Prompt,
        model: LLModel,
        tools: List<ToolDescriptor>
    ): Flow<StreamFrame> {
        logger.debug { "Executing streaming prompt for model: ${model.id}" }
        val modelFamily = getBedrockModelFamily(model)
        model.requireCapability(LLMCapability.Completion, "Model ${model.id} does not support chat completions")
        val requestBody = createRequestBody(prompt, model, tools)
        val streamRequest = InvokeModelWithResponseStreamRequest {
            this.modelId = model.id
            this.contentType = "application/json"
            this.accept = "*/*"
            this.body = requestBody.encodeToByteArray()
        }
        logger.debug { "Bedrock InvokeModelWithResponseStream Request: ModelID: ${model.id}, Body: $requestBody" }

        return channelFlow {
            try {
                withContext(Dispatchers.SuitableForIO) {
                    bedrockClient.invokeModelWithResponseStream(
                        streamRequest
                    ) { response: InvokeModelWithResponseStreamResponse ->
                        response.body?.collect { event: ResponseStream ->
                            val chunkBytes = event.asChunk().bytes
                            if (chunkBytes != null) {
                                val chunkJsonString = chunkBytes.decodeToString()
                                send(chunkJsonString)
                                logger.trace { "Bedrock Stream Chunk for model ${model.id}: $chunkJsonString" }
                            } else {
                                logger.warn { "Received null chunk bytes in stream for model ${model.id}" }
                            }
                        }
                    }
                }
            } catch (e: CancellationException) {
                throw e
            } catch (e: Exception) {
                val exception = LLMClientException(
                    clientName = clientName,
                    message = "Error in Bedrock streaming for model ${model.id}",
                    cause = e
                )
                logger.error(exception) { exception.message }
                close(exception)
            }
        }.filterNot {
            it.isBlank()
        }.run {
            when (modelFamily) {
                is BedrockModelFamilies.AI21Jamba -> genericProcessStream(
                    this,
                    BedrockAI21JambaSerialization::parseJambaStreamChunk
                )

                is BedrockModelFamilies.AmazonNova -> genericProcessStream(
                    this,
                    BedrockAmazonNovaSerialization::parseNovaStreamChunk
                )

                is BedrockModelFamilies.Meta -> genericProcessStream(
                    this,
                    BedrockMetaLlamaSerialization::parseLlamaStreamChunk
                )

                is BedrockModelFamilies.AnthropicClaude -> BedrockAnthropicClaudeSerialization.transformAnthropicStreamChunks(
                    chunkJsonStringFlow = this,
                    clock = clock,
                )

                is BedrockModelFamilies.TitanEmbedding, is BedrockModelFamilies.Cohere ->
                    throw LLMClientException(
                        clientName,
                        "Embedding models do not support streaming chat completions. Use embed() instead."
                    )
            }
        }
    }