in prompt/prompt-executor/prompt-executor-clients/prompt-executor-bedrock-client/src/jvmMain/kotlin/ai/koog/prompt/executor/clients/bedrock/BedrockLLMClient.kt [277:354]
override fun executeStreaming(
prompt: Prompt,
model: LLModel,
tools: List<ToolDescriptor>
): Flow<StreamFrame> {
logger.debug { "Executing streaming prompt for model: ${model.id}" }
val modelFamily = getBedrockModelFamily(model)
model.requireCapability(LLMCapability.Completion, "Model ${model.id} does not support chat completions")
val requestBody = createRequestBody(prompt, model, tools)
val streamRequest = InvokeModelWithResponseStreamRequest {
this.modelId = model.id
this.contentType = "application/json"
this.accept = "*/*"
this.body = requestBody.encodeToByteArray()
}
logger.debug { "Bedrock InvokeModelWithResponseStream Request: ModelID: ${model.id}, Body: $requestBody" }
return channelFlow {
try {
withContext(Dispatchers.SuitableForIO) {
bedrockClient.invokeModelWithResponseStream(
streamRequest
) { response: InvokeModelWithResponseStreamResponse ->
response.body?.collect { event: ResponseStream ->
val chunkBytes = event.asChunk().bytes
if (chunkBytes != null) {
val chunkJsonString = chunkBytes.decodeToString()
send(chunkJsonString)
logger.trace { "Bedrock Stream Chunk for model ${model.id}: $chunkJsonString" }
} else {
logger.warn { "Received null chunk bytes in stream for model ${model.id}" }
}
}
}
}
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
val exception = LLMClientException(
clientName = clientName,
message = "Error in Bedrock streaming for model ${model.id}",
cause = e
)
logger.error(exception) { exception.message }
close(exception)
}
}.filterNot {
it.isBlank()
}.run {
when (modelFamily) {
is BedrockModelFamilies.AI21Jamba -> genericProcessStream(
this,
BedrockAI21JambaSerialization::parseJambaStreamChunk
)
is BedrockModelFamilies.AmazonNova -> genericProcessStream(
this,
BedrockAmazonNovaSerialization::parseNovaStreamChunk
)
is BedrockModelFamilies.Meta -> genericProcessStream(
this,
BedrockMetaLlamaSerialization::parseLlamaStreamChunk
)
is BedrockModelFamilies.AnthropicClaude -> BedrockAnthropicClaudeSerialization.transformAnthropicStreamChunks(
chunkJsonStringFlow = this,
clock = clock,
)
is BedrockModelFamilies.TitanEmbedding, is BedrockModelFamilies.Cohere ->
throw LLMClientException(
clientName,
"Embedding models do not support streaming chat completions. Use embed() instead."
)
}
}
}