static int s_process_read_message()

in source/event_stream_channel_handler.c [47:222]


static int s_process_read_message(
    struct aws_channel_handler *handler,
    struct aws_channel_slot *slot,
    struct aws_io_message *message) {

    AWS_LOGF_TRACE(
        AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
        "id=%p: received message of size %zu",
        (void *)handler,
        message->message_data.len);
    struct aws_event_stream_channel_handler *event_stream_handler = handler->impl;

    struct aws_byte_cursor message_cursor = aws_byte_cursor_from_buf(&message->message_data);

    int error_code = AWS_ERROR_SUCCESS;
    while (message_cursor.len) {
        AWS_LOGF_TRACE(
            AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
            "id=%p: processing chunk of size %zu",
            (void *)handler,
            message_cursor.len);

        /* first read only the prelude so we can do checks before reading the entire buffer. */
        if (event_stream_handler->message_buf.len < AWS_EVENT_STREAM_PRELUDE_LENGTH) {
            size_t remaining_prelude = AWS_EVENT_STREAM_PRELUDE_LENGTH - event_stream_handler->message_buf.len;
            size_t to_copy = aws_min_size(message_cursor.len, remaining_prelude);
            AWS_LOGF_TRACE(
                AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                "id=%p: processing prelude, %zu bytes of an expected 12.",
                (void *)handler,
                to_copy);

            if (!aws_byte_buf_write(&event_stream_handler->message_buf, message_cursor.ptr, to_copy)) {
                error_code = aws_last_error();
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                    "id=%p: writing to prelude buffer failed with error %s",
                    (void *)handler,
                    aws_error_debug_str(error_code));
                goto finished;
            }

            aws_byte_cursor_advance(&message_cursor, to_copy);
        }

        /* we need to get the prelude so we can get the message length to know how much to read and also
         * to check the prelude CRC to protect against bit-flips causing us to read to much memory */
        if (event_stream_handler->message_buf.len == AWS_EVENT_STREAM_PRELUDE_LENGTH) {
            AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "id=%p: processing prelude buffer", (void *)handler);

            struct aws_byte_cursor prelude_cursor = aws_byte_cursor_from_buf(&event_stream_handler->message_buf);

            event_stream_handler->running_crc =
                aws_checksums_crc32(prelude_cursor.ptr, sizeof(uint32_t) + sizeof(uint32_t), 0);
            AWS_LOGF_DEBUG(
                AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                "id=%p: calculated prelude CRC of %" PRIu32,
                (void *)handler,
                event_stream_handler->running_crc);

            aws_byte_cursor_read_be32(&prelude_cursor, &event_stream_handler->current_message_len);

            AWS_LOGF_DEBUG(
                AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                "id=%p: read total message length of %" PRIu32,
                (void *)handler,
                event_stream_handler->current_message_len);
            if (event_stream_handler->current_message_len > AWS_EVENT_STREAM_MAX_MESSAGE_SIZE) {
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                    "id=%p: message length of %" PRIu32 " exceeds the max size of %zu",
                    (void *)handler,
                    event_stream_handler->current_message_len,
                    (size_t)AWS_EVENT_STREAM_MAX_MESSAGE_SIZE);
                aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
                error_code = aws_last_error();
                goto finished;
            }

            /* advance past the headers field since we don't really care about it at this point */
            aws_byte_cursor_advance(&prelude_cursor, sizeof(uint32_t));

            uint32_t prelude_crc = 0;
            aws_byte_cursor_read_be32(&prelude_cursor, &prelude_crc);
            AWS_LOGF_DEBUG(
                AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                "id=%p: read prelude CRC of %" PRIu32,
                (void *)handler,
                prelude_crc);

            /* make sure the checksum matches before processing any further */
            if (event_stream_handler->running_crc != prelude_crc) {
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                    "id=%p: prelude CRC mismatch. calculated %" PRIu32 " but the crc for the message was %" PRIu32,
                    (void *)handler,
                    event_stream_handler->running_crc,
                    prelude_crc);
                aws_raise_error(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE);
                error_code = aws_last_error();
                goto finished;
            }
        }

        /* read whatever is remaining from the message */
        if (event_stream_handler->message_buf.len < event_stream_handler->current_message_len) {
            AWS_LOGF_TRACE(
                AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "id=%p: processing remaining message buffer", (void *)handler);
            size_t remaining = event_stream_handler->current_message_len - event_stream_handler->message_buf.len;
            size_t to_copy = aws_min_size(message_cursor.len, remaining);
            AWS_LOGF_TRACE(
                AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                "id=%p: of the remaining %zu, processing %zu from the "
                "current message.",
                (void *)handler,
                remaining,
                to_copy);

            struct aws_byte_cursor to_append = aws_byte_cursor_advance(&message_cursor, to_copy);
            if (aws_byte_buf_append_dynamic(&event_stream_handler->message_buf, &to_append)) {
                error_code = aws_last_error();
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                    "id=%p: Appending to the message buffer failed with error %s.",
                    (void *)handler,
                    aws_error_debug_str(error_code));

                goto finished;
            }
        }

        /* If we read the entire message, parse it and give it back to the subscriber. Keep in mind, once we're to this
         * point the aws_event_stream API handles the rest of the message parsing and validation. */
        if (event_stream_handler->message_buf.len == event_stream_handler->current_message_len) {
            AWS_LOGF_TRACE(
                AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                "id=%p: An entire message has been read. Parsing the message now.",
                (void *)handler);
            struct aws_event_stream_message received_message;
            AWS_ZERO_STRUCT(received_message);

            if (aws_event_stream_message_from_buffer(
                    &received_message, event_stream_handler->handler.alloc, &event_stream_handler->message_buf)) {
                error_code = aws_last_error();
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
                    "id=%p: Parsing the message failed with error %s.",
                    (void *)handler,
                    aws_error_debug_str(error_code));
                goto finished;
            }

            size_t message_size = event_stream_handler->message_buf.len;
            AWS_LOGF_TRACE(
                AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "id=%p: Invoking on_message_received callback.", (void *)handler);
            event_stream_handler->on_message_received(
                &received_message, AWS_ERROR_SUCCESS, event_stream_handler->user_data);
            aws_event_stream_message_clean_up(&received_message);
            event_stream_handler->current_message_len = 0;
            event_stream_handler->running_crc = 0;
            aws_byte_buf_reset(&event_stream_handler->message_buf, true);

            if (!event_stream_handler->manual_window_management) {
                aws_channel_slot_increment_read_window(slot, message_size);
            }
        }
    }

finished:
    if (error_code) {
        event_stream_handler->on_message_received(NULL, error_code, event_stream_handler->user_data);
        aws_channel_shutdown(slot->channel, error_code);
    }
    aws_mem_release(message->allocator, message);
    return AWS_OP_SUCCESS;
}