in anthropic-java-bedrock/src/main/kotlin/com/anthropic/bedrock/backends/BedrockBackend.kt [244:333]
override fun prepareResponse(response: HttpResponse): HttpResponse {
if (
!response.headers().values(HEADER_CONTENT_TYPE).contains(CONTENT_TYPE_AWS_EVENT_STREAM)
) {
return response
}
val payloadContentType = response.headers().values(HEADER_PAYLOAD_CONTENT_TYPE)
if (!payloadContentType.contains(CONTENT_TYPE_JSON)) {
throw AnthropicInvalidDataException(
"Expected streamed Bedrock events to have content type of " +
"$CONTENT_TYPE_JSON, but was $payloadContentType."
)
}
val responseInput = response.body()
val pipedInput = PipedInputStream()
val pipedOutput = PipedOutputStream(pipedInput)
// spotless:off
//
// A decoded AWS EventStream message's payload is JSON. It might look like this (abridged):
//
// {"bytes":"eyJ0eXBlIjoi...ZXJlIn19","p":"abcdefghijkl"}
//
// The value of the "bytes" field is a base-64 encoded JSON string (UTF-8). When decoded, it
// might look like this:
//
// {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
//
// Parse the "type" field to allow the construction of a server-sent event (SSE) that might
// look like this:
//
// event: content_block_delta
// data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
//
// Print the SSE (with a blank line after) to the piped output stream to complete the
// translation process.
//
// A thread avoids deadlocking the pipe. If everything is on the same thread, a "read" that
// blocks waiting for more data to be written, would block the thread from executing the
// necessary "write" and cause a deadlock.
//
// spotless:on
sseThreadPool.execute {
responseInput.use { input ->
// "use" closes the piped output stream when done, which signals
// the end-of-file to the reader of the piped input stream.
pipedOutput.use { output ->
// When fed enough data (see loop, below) to create a new
// "Message", the "Consumer.accept" lambda here is fired.
val messageDecoder = MessageDecoder { message ->
val sseJson =
String(
Base64.getDecoder()
.decode(
jsonMapper.readTree(message.payload).get("bytes").asText()
)
)
val sseEventType = jsonMapper.readTree(sseJson).get("type").asText()
output.write("event: $sseEventType\ndata: $sseJson\n\n".toByteArray())
output.flush()
}
val buffer = ByteArray(4096)
var bytesRead: Int
while (input.read(buffer).also { bytesRead = it } != -1) {
messageDecoder.feed(buffer, 0, bytesRead)
}
}
}
}
return object : HttpResponse {
override fun statusCode(): Int = response.statusCode()
override fun headers(): Headers =
response
.headers()
.toBuilder()
.replace(HEADER_CONTENT_TYPE, CONTENT_TYPE_SSE_STREAM)
.build()
override fun body(): InputStream = pipedInput
override fun close() = pipedInput.close()
}
}