in source/h1_connection.c [1425:1519]
static int s_try_process_next_midchannel_read_message(struct aws_h1_connection *connection, bool *out_stop_processing) {
AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
AWS_ASSERT(connection->thread_data.has_switched_protocols);
AWS_ASSERT(!connection->thread_data.is_reading_stopped);
AWS_ASSERT(!aws_linked_list_empty(&connection->thread_data.read_buffer.messages));
*out_stop_processing = false;
struct aws_io_message *sending_msg = NULL;
if (!connection->base.channel_slot->adj_right) {
AWS_LOGF_ERROR(
AWS_LS_HTTP_CONNECTION,
"id=%p: Connection has switched protocols, but no handler is installed to deal with this data.",
(void *)connection);
return aws_raise_error(AWS_ERROR_HTTP_SWITCHED_PROTOCOLS);
}
size_t downstream_window = aws_channel_slot_downstream_read_window(connection->base.channel_slot);
if (downstream_window == 0) {
AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION,
"id=%p: Downstream window is 0, cannot send switched-protocol message now.",
(void *)&connection->base);
*out_stop_processing = true;
return AWS_OP_SUCCESS;
}
struct aws_linked_list_node *queued_msg_node = aws_linked_list_front(&connection->thread_data.read_buffer.messages);
struct aws_io_message *queued_msg = AWS_CONTAINER_OF(queued_msg_node, struct aws_io_message, queueing_handle);
/* Note that copy_mark is used to mark the progress of partially sent messages. */
AWS_ASSERT(queued_msg->message_data.len > queued_msg->copy_mark);
size_t sending_bytes = aws_min_size(queued_msg->message_data.len - queued_msg->copy_mark, downstream_window);
AWS_ASSERT(connection->thread_data.read_buffer.pending_bytes >= sending_bytes);
connection->thread_data.read_buffer.pending_bytes -= sending_bytes;
/* If we can't send the whole entire queued_msg, copy its data into a new aws_io_message and send that. */
if (sending_bytes != queued_msg->message_data.len) {
sending_msg = aws_channel_acquire_message_from_pool(
connection->base.channel_slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, sending_bytes);
if (!sending_msg) {
goto error;
}
aws_byte_buf_write(
&sending_msg->message_data, queued_msg->message_data.buffer + queued_msg->copy_mark, sending_bytes);
queued_msg->copy_mark += sending_bytes;
AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION,
"id=%p: Sending %zu bytes switched-protocol message to downstream handler, %zu bytes remain.",
(void *)&connection->base,
sending_bytes,
queued_msg->message_data.len - queued_msg->copy_mark);
/* If the last of queued_msg has been copied, it can be deleted now. */
if (queued_msg->copy_mark == queued_msg->message_data.len) {
aws_linked_list_remove(queued_msg_node);
aws_mem_release(queued_msg->allocator, queued_msg);
}
} else {
/* Sending all of queued_msg along. */
AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION,
"id=%p: Sending full switched-protocol message of size %zu to downstream handler.",
(void *)&connection->base,
queued_msg->message_data.len);
aws_linked_list_remove(queued_msg_node);
sending_msg = queued_msg;
}
int err = aws_channel_slot_send_message(connection->base.channel_slot, sending_msg, AWS_CHANNEL_DIR_READ);
if (err) {
AWS_LOGF_ERROR(
AWS_LS_HTTP_CONNECTION,
"id=%p: Failed to send message in read direction, error %d (%s).",
(void *)&connection->base,
aws_last_error(),
aws_error_name(aws_last_error()));
goto error;
}
return AWS_OP_SUCCESS;
error:
if (sending_msg) {
aws_mem_release(sending_msg->allocator, sending_msg);
}
return AWS_OP_ERR;
}