in source/client_channel_handler.c [465:582]
static int s_process_read_message(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
struct aws_io_message *message) {
struct aws_mqtt_client_connection *connection = handler->impl;
if (message->message_type != AWS_IO_MESSAGE_APPLICATION_DATA || message->message_data.len < 1) {
return AWS_OP_ERR;
}
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: precessing read message of size %zu",
(void *)connection,
message->message_data.len);
/* This cursor will be updated as we read through the message. */
struct aws_byte_cursor message_cursor = aws_byte_cursor_from_buf(&message->message_data);
/* If there's pending packet left over from last time, attempt to complete it. */
if (connection->thread_data.pending_packet.len) {
int result = AWS_OP_SUCCESS;
/* This determines how much to read from the message (min(expected_remaining, message.len)) */
size_t to_read = connection->thread_data.pending_packet.capacity - connection->thread_data.pending_packet.len;
/* This will be set to false if this message still won't complete the packet object. */
bool packet_complete = true;
if (to_read > message_cursor.len) {
to_read = message_cursor.len;
packet_complete = false;
}
/* Write the chunk to the buffer.
* This will either complete the packet, or be the entirety of message if more data is required. */
struct aws_byte_cursor chunk = aws_byte_cursor_advance(&message_cursor, to_read);
AWS_ASSERT(chunk.ptr); /* Guaranteed to be in bounds */
result = (int)aws_byte_buf_write_from_whole_cursor(&connection->thread_data.pending_packet, chunk) - 1;
if (result) {
goto handle_error;
}
/* If the packet is still incomplete, don't do anything with the data. */
if (!packet_complete) {
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: partial message is still incomplete, waiting on another read.",
(void *)connection);
goto cleanup;
}
/* Handle the completed pending packet */
struct aws_byte_cursor packet_data = aws_byte_cursor_from_buf(&connection->thread_data.pending_packet);
AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: full mqtt packet re-assembled, dispatching.", (void *)connection);
result = s_process_mqtt_packet(connection, aws_mqtt_get_packet_type(packet_data.ptr), packet_data);
handle_error:
/* Clean up the pending packet */
aws_byte_buf_clean_up(&connection->thread_data.pending_packet);
AWS_ZERO_STRUCT(connection->thread_data.pending_packet);
if (result) {
return AWS_OP_ERR;
}
}
while (message_cursor.len) {
/* Temp byte cursor so we can decode the header without advancing message_cursor. */
struct aws_byte_cursor header_decode = message_cursor;
struct aws_mqtt_fixed_header packet_header;
AWS_ZERO_STRUCT(packet_header);
int result = aws_mqtt_fixed_header_decode(&header_decode, &packet_header);
/* Calculate how much data was read. */
const size_t fixed_header_size = message_cursor.len - header_decode.len;
if (result) {
if (aws_last_error() == AWS_ERROR_SHORT_BUFFER) {
/* Message data too short, store data and come back later. */
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT, "id=%p: message is incomplete, waiting on another read.", (void *)connection);
if (aws_byte_buf_init(
&connection->thread_data.pending_packet,
connection->allocator,
fixed_header_size + packet_header.remaining_length)) {
return AWS_OP_ERR;
}
/* Write the partial packet. */
if (!aws_byte_buf_write_from_whole_cursor(&connection->thread_data.pending_packet, message_cursor)) {
aws_byte_buf_clean_up(&connection->thread_data.pending_packet);
return AWS_OP_ERR;
}
aws_reset_error();
goto cleanup;
} else {
return AWS_OP_ERR;
}
}
struct aws_byte_cursor packet_data =
aws_byte_cursor_advance(&message_cursor, fixed_header_size + packet_header.remaining_length);
AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: full mqtt packet read, dispatching.", (void *)connection);
s_process_mqtt_packet(connection, packet_header.packet_type, packet_data);
}
cleanup:
/* Do cleanup */
aws_channel_slot_increment_read_window(slot, message->message_data.len);
aws_mem_release(message->allocator, message);
return AWS_OP_SUCCESS;
}