static void s_write_handler_message()

in source/event_stream_channel_handler.c [251:325]


static void s_write_handler_message(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
    (void)task;

    struct message_write_data *message_data = arg;

    AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "static: Write message task invoked.");
    if (status == AWS_TASK_STATUS_RUN_READY) {
        struct aws_event_stream_message *message = message_data->message;
        struct aws_event_stream_channel_handler *handler = message_data->handler;

        struct aws_byte_cursor message_cur = aws_byte_cursor_from_array(
            aws_event_stream_message_buffer(message), aws_event_stream_message_total_length(message));

        while (message_cur.len) {
            AWS_LOGF_TRACE(
                AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                "id=%p: writing message chunk of size %zu.",
                (void *)&handler->handler,
                message_cur.len);

            /* io messages from the pool are allowed to be smaller than the requested size. */
            struct aws_io_message *io_message = aws_channel_acquire_message_from_pool(
                handler->handler.slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, message_cur.len);

            if (!io_message) {
                int error_code = aws_last_error();
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                    "id=%p: Error occurred while acquiring io message %s.",
                    (void *)&handler->handler,
                    aws_error_debug_str(error_code));

                message_data->on_message_written(message, error_code, message_data->user_data);
                aws_mem_release(message_data->allocator, message_data);
                aws_channel_shutdown(handler->handler.slot->channel, error_code);
                break;
            }

            aws_byte_buf_write_to_capacity(&io_message->message_data, &message_cur);

            /* if that was the end of the buffer we want to write, attach the completion callback to that io message */
            if (message_cur.len == 0) {
                AWS_LOGF_TRACE(
                    AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                    "id=%p: Message completely written to all io buffers.",
                    (void *)&handler->handler);
                io_message->on_completion = s_on_message_write_completed_fn;
                io_message->user_data = message_data;
            }

            /* note if this fails the io message will not be queued and as a result will not have it's completion
             * callback invoked. */
            if (aws_channel_slot_send_message(handler->handler.slot, io_message, AWS_CHANNEL_DIR_WRITE)) {
                aws_mem_release(io_message->allocator, io_message);
                int error_code = aws_last_error();
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                    "id=%p: Error occurred while sending message to channel %s.",
                    (void *)&handler->handler,
                    aws_error_debug_str(error_code));
                message_data->on_message_written(message, error_code, message_data->user_data);
                aws_mem_release(message_data->allocator, message_data);
                aws_channel_shutdown(handler->handler.slot->channel, error_code);
                break;
            }
            AWS_LOGF_TRACE(
                AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "id=%p: Message sent to channel", (void *)&handler->handler);
        }
    } else {
        AWS_LOGF_WARN(AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "static: Channel was shutdown. Message not sent");
        message_data->on_message_written(
            message_data->message, AWS_ERROR_IO_OPERATION_CANCELLED, message_data->user_data);
        aws_mem_release(message_data->allocator, message_data);
    }
}