static int s_headers_state()

in source/event_stream.c [879:953]


static int s_headers_state(
    struct aws_event_stream_streaming_decoder *decoder,
    const uint8_t *data,
    size_t len,
    size_t *processed);

static int s_read_header_value(
    struct aws_event_stream_streaming_decoder *decoder,
    const uint8_t *data,
    size_t len,
    size_t *processed) {

    size_t current_pos = decoder->message_pos;

    size_t length_read = current_pos - decoder->current_header_value_offset;
    struct aws_event_stream_header_value_pair *current_header = &decoder->current_header;

    if (!length_read) {
        /* save an allocation, this can only happen if the data we were handed is larger than the length of the header
         * value. we don't really need to handle offsets in this case. This expects the user is living by the contract
         * that they cannot act like they own this memory beyond the lifetime of their callback, and they should not
         * mutate it */
        if (len >= current_header->header_value_len) {
            /* this part works regardless of type since the layout of the union will line up. */
            current_header->header_value.variable_len_val = (uint8_t *)data;
            current_header->value_owned = 0;
            decoder->on_header(decoder, &decoder->prelude, &decoder->current_header, decoder->user_context);
            *processed += current_header->header_value_len;
            decoder->message_pos += current_header->header_value_len;
            decoder->running_crc =
                aws_checksums_crc32(data, (int)current_header->header_value_len, decoder->running_crc);

            s_reset_header_state(decoder, 1);
            decoder->state = s_headers_state;
            return AWS_OP_SUCCESS;
        }

        /* a possible optimization later would be to only allocate this once, and then keep reusing the same buffer. for
         * subsequent messages.*/
        if (current_header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE_BUF ||
            current_header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING) {
            current_header->header_value.variable_len_val =
                aws_mem_acquire(decoder->alloc, decoder->current_header.header_value_len);

            if (!current_header->header_value.variable_len_val) {
                return aws_raise_error(AWS_ERROR_OOM);
            }

            current_header->value_owned = 1;
        }
    }

    size_t max_read =
        len >= current_header->header_value_len - length_read ? current_header->header_value_len - length_read : len;

    const uint8_t *header_value_alias = current_header->header_value_type == AWS_EVENT_STREAM_HEADER_BYTE_BUF ||
                                                current_header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING
                                            ? current_header->header_value.variable_len_val
                                            : current_header->header_value.static_val;

    memcpy((void *)(header_value_alias + length_read), data, max_read);
    decoder->running_crc = aws_checksums_crc32(data, (int)max_read, decoder->running_crc);

    *processed += max_read;
    decoder->message_pos += max_read;
    length_read += max_read;

    if (length_read == current_header->header_value_len) {
        decoder->on_header(decoder, &decoder->prelude, current_header, decoder->user_context);
        s_reset_header_state(decoder, 1);
        decoder->state = s_headers_state;
    }

    return AWS_OP_SUCCESS;
}