override fun prepareResponse()

in anthropic-java-bedrock/src/main/kotlin/com/anthropic/bedrock/backends/BedrockBackend.kt [244:333]


    override fun prepareResponse(response: HttpResponse): HttpResponse {
        if (
            !response.headers().values(HEADER_CONTENT_TYPE).contains(CONTENT_TYPE_AWS_EVENT_STREAM)
        ) {
            return response
        }

        val payloadContentType = response.headers().values(HEADER_PAYLOAD_CONTENT_TYPE)

        if (!payloadContentType.contains(CONTENT_TYPE_JSON)) {
            throw AnthropicInvalidDataException(
                "Expected streamed Bedrock events to have content type of " +
                    "$CONTENT_TYPE_JSON, but was $payloadContentType."
            )
        }

        val responseInput = response.body()
        val pipedInput = PipedInputStream()
        val pipedOutput = PipedOutputStream(pipedInput)

        // spotless:off
        //
        // A decoded AWS EventStream message's payload is JSON. It might look like this (abridged):
        //
        //   {"bytes":"eyJ0eXBlIjoi...ZXJlIn19","p":"abcdefghijkl"}
        //
        // The value of the "bytes" field is a base-64 encoded JSON string (UTF-8). When decoded, it
        // might look like this:
        //
        //   {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
        //
        // Parse the "type" field to allow the construction of a server-sent event (SSE) that might
        // look like this:
        //
        //   event: content_block_delta
        //   data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}
        //
        // Print the SSE (with a blank line after) to the piped output stream to complete the
        // translation process.
        //
        // A thread avoids deadlocking the pipe. If everything is on the same thread, a "read" that
        // blocks waiting for more data to be written, would block the thread from executing the
        // necessary "write" and cause a deadlock.
        //
        // spotless:on
        sseThreadPool.execute {
            responseInput.use { input ->
                // "use" closes the piped output stream when done, which signals
                // the end-of-file to the reader of the piped input stream.
                pipedOutput.use { output ->
                    // When fed enough data (see loop, below) to create a new
                    // "Message", the "Consumer.accept" lambda here is fired.
                    val messageDecoder = MessageDecoder { message ->
                        val sseJson =
                            String(
                                Base64.getDecoder()
                                    .decode(
                                        jsonMapper.readTree(message.payload).get("bytes").asText()
                                    )
                            )
                        val sseEventType = jsonMapper.readTree(sseJson).get("type").asText()

                        output.write("event: $sseEventType\ndata: $sseJson\n\n".toByteArray())
                        output.flush()
                    }

                    val buffer = ByteArray(4096)
                    var bytesRead: Int
                    while (input.read(buffer).also { bytesRead = it } != -1) {
                        messageDecoder.feed(buffer, 0, bytesRead)
                    }
                }
            }
        }

        return object : HttpResponse {
            override fun statusCode(): Int = response.statusCode()

            override fun headers(): Headers =
                response
                    .headers()
                    .toBuilder()
                    .replace(HEADER_CONTENT_TYPE, CONTENT_TYPE_SSE_STREAM)
                    .build()

            override fun body(): InputStream = pipedInput

            override fun close() = pipedInput.close()
        }
    }