in source/event_stream_channel_handler.c [251:325]
static void s_write_handler_message(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
(void)task;
struct message_write_data *message_data = arg;
AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "static: Write message task invoked.");
if (status == AWS_TASK_STATUS_RUN_READY) {
struct aws_event_stream_message *message = message_data->message;
struct aws_event_stream_channel_handler *handler = message_data->handler;
struct aws_byte_cursor message_cur = aws_byte_cursor_from_array(
aws_event_stream_message_buffer(message), aws_event_stream_message_total_length(message));
while (message_cur.len) {
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: writing message chunk of size %zu.",
(void *)&handler->handler,
message_cur.len);
/* io messages from the pool are allowed to be smaller than the requested size. */
struct aws_io_message *io_message = aws_channel_acquire_message_from_pool(
handler->handler.slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, message_cur.len);
if (!io_message) {
int error_code = aws_last_error();
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: Error occurred while acquiring io message %s.",
(void *)&handler->handler,
aws_error_debug_str(error_code));
message_data->on_message_written(message, error_code, message_data->user_data);
aws_mem_release(message_data->allocator, message_data);
aws_channel_shutdown(handler->handler.slot->channel, error_code);
break;
}
aws_byte_buf_write_to_capacity(&io_message->message_data, &message_cur);
/* if that was the end of the buffer we want to write, attach the completion callback to that io message */
if (message_cur.len == 0) {
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: Message completely written to all io buffers.",
(void *)&handler->handler);
io_message->on_completion = s_on_message_write_completed_fn;
io_message->user_data = message_data;
}
/* note if this fails the io message will not be queued and as a result will not have it's completion
* callback invoked. */
if (aws_channel_slot_send_message(handler->handler.slot, io_message, AWS_CHANNEL_DIR_WRITE)) {
aws_mem_release(io_message->allocator, io_message);
int error_code = aws_last_error();
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: Error occurred while sending message to channel %s.",
(void *)&handler->handler,
aws_error_debug_str(error_code));
message_data->on_message_written(message, error_code, message_data->user_data);
aws_mem_release(message_data->allocator, message_data);
aws_channel_shutdown(handler->handler.slot->channel, error_code);
break;
}
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "id=%p: Message sent to channel", (void *)&handler->handler);
}
} else {
AWS_LOGF_WARN(AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "static: Channel was shutdown. Message not sent");
message_data->on_message_written(
message_data->message, AWS_ERROR_IO_OPERATION_CANCELLED, message_data->user_data);
aws_mem_release(message_data->allocator, message_data);
}
}