in source/event_stream_channel_handler.c [47:222]
static int s_process_read_message(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
struct aws_io_message *message) {
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: received message of size %zu",
(void *)handler,
message->message_data.len);
struct aws_event_stream_channel_handler *event_stream_handler = handler->impl;
struct aws_byte_cursor message_cursor = aws_byte_cursor_from_buf(&message->message_data);
int error_code = AWS_ERROR_SUCCESS;
while (message_cursor.len) {
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: processing chunk of size %zu",
(void *)handler,
message_cursor.len);
/* first read only the prelude so we can do checks before reading the entire buffer. */
if (event_stream_handler->message_buf.len < AWS_EVENT_STREAM_PRELUDE_LENGTH) {
size_t remaining_prelude = AWS_EVENT_STREAM_PRELUDE_LENGTH - event_stream_handler->message_buf.len;
size_t to_copy = aws_min_size(message_cursor.len, remaining_prelude);
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: processing prelude, %zu bytes of an expected 12.",
(void *)handler,
to_copy);
if (!aws_byte_buf_write(&event_stream_handler->message_buf, message_cursor.ptr, to_copy)) {
error_code = aws_last_error();
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: writing to prelude buffer failed with error %s",
(void *)handler,
aws_error_debug_str(error_code));
goto finished;
}
aws_byte_cursor_advance(&message_cursor, to_copy);
}
/* we need to get the prelude so we can get the message length to know how much to read and also
* to check the prelude CRC to protect against bit-flips causing us to read to much memory */
if (event_stream_handler->message_buf.len == AWS_EVENT_STREAM_PRELUDE_LENGTH) {
AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "id=%p: processing prelude buffer", (void *)handler);
struct aws_byte_cursor prelude_cursor = aws_byte_cursor_from_buf(&event_stream_handler->message_buf);
event_stream_handler->running_crc =
aws_checksums_crc32(prelude_cursor.ptr, sizeof(uint32_t) + sizeof(uint32_t), 0);
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: calculated prelude CRC of %" PRIu32,
(void *)handler,
event_stream_handler->running_crc);
aws_byte_cursor_read_be32(&prelude_cursor, &event_stream_handler->current_message_len);
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: read total message length of %" PRIu32,
(void *)handler,
event_stream_handler->current_message_len);
if (event_stream_handler->current_message_len > AWS_EVENT_STREAM_MAX_MESSAGE_SIZE) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: message length of %" PRIu32 " exceeds the max size of %zu",
(void *)handler,
event_stream_handler->current_message_len,
(size_t)AWS_EVENT_STREAM_MAX_MESSAGE_SIZE);
aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
error_code = aws_last_error();
goto finished;
}
/* advance past the headers field since we don't really care about it at this point */
aws_byte_cursor_advance(&prelude_cursor, sizeof(uint32_t));
uint32_t prelude_crc = 0;
aws_byte_cursor_read_be32(&prelude_cursor, &prelude_crc);
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: read prelude CRC of %" PRIu32,
(void *)handler,
prelude_crc);
/* make sure the checksum matches before processing any further */
if (event_stream_handler->running_crc != prelude_crc) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: prelude CRC mismatch. calculated %" PRIu32 " but the crc for the message was %" PRIu32,
(void *)handler,
event_stream_handler->running_crc,
prelude_crc);
aws_raise_error(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE);
error_code = aws_last_error();
goto finished;
}
}
/* read whatever is remaining from the message */
if (event_stream_handler->message_buf.len < event_stream_handler->current_message_len) {
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "id=%p: processing remaining message buffer", (void *)handler);
size_t remaining = event_stream_handler->current_message_len - event_stream_handler->message_buf.len;
size_t to_copy = aws_min_size(message_cursor.len, remaining);
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: of the remaining %zu, processing %zu from the "
"current message.",
(void *)handler,
remaining,
to_copy);
struct aws_byte_cursor to_append = aws_byte_cursor_advance(&message_cursor, to_copy);
if (aws_byte_buf_append_dynamic(&event_stream_handler->message_buf, &to_append)) {
error_code = aws_last_error();
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: Appending to the message buffer failed with error %s.",
(void *)handler,
aws_error_debug_str(error_code));
goto finished;
}
}
/* If we read the entire message, parse it and give it back to the subscriber. Keep in mind, once we're to this
* point the aws_event_stream API handles the rest of the message parsing and validation. */
if (event_stream_handler->message_buf.len == event_stream_handler->current_message_len) {
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: An entire message has been read. Parsing the message now.",
(void *)handler);
struct aws_event_stream_message received_message;
AWS_ZERO_STRUCT(received_message);
if (aws_event_stream_message_from_buffer(
&received_message, event_stream_handler->handler.alloc, &event_stream_handler->message_buf)) {
error_code = aws_last_error();
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
"id=%p: Parsing the message failed with error %s.",
(void *)handler,
aws_error_debug_str(error_code));
goto finished;
}
size_t message_size = event_stream_handler->message_buf.len;
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "id=%p: Invoking on_message_received callback.", (void *)handler);
event_stream_handler->on_message_received(
&received_message, AWS_ERROR_SUCCESS, event_stream_handler->user_data);
aws_event_stream_message_clean_up(&received_message);
event_stream_handler->current_message_len = 0;
event_stream_handler->running_crc = 0;
aws_byte_buf_reset(&event_stream_handler->message_buf, true);
if (!event_stream_handler->manual_window_management) {
aws_channel_slot_increment_read_window(slot, message_size);
}
}
}
finished:
if (error_code) {
event_stream_handler->on_message_received(NULL, error_code, event_stream_handler->user_data);
aws_channel_shutdown(slot->channel, error_code);
}
aws_mem_release(message->allocator, message);
return AWS_OP_SUCCESS;
}