int aws_event_stream_rpc_extract_message_metadata()

in source/event_stream_rpc.c [17:95]


int aws_event_stream_rpc_extract_message_metadata(
    const struct aws_array_list *message_headers,
    int32_t *stream_id,
    int32_t *message_type,
    int32_t *message_flags,
    struct aws_byte_buf *operation_name) {
    size_t length = aws_array_list_length(message_headers);
    bool message_type_found = 0;
    bool message_flags_found = 0;
    bool stream_id_found = 0;
    bool operation_name_found = 0;

    AWS_LOGF_TRACE(
        AWS_LS_EVENT_STREAM_GENERAL, "processing message headers for rpc protocol. %zu headers to process.", length);

    for (size_t i = 0; i < length; ++i) {
        struct aws_event_stream_header_value_pair *header = NULL;
        aws_array_list_get_at_ptr(message_headers, (void **)&header, i);
        struct aws_byte_buf name_buf = aws_event_stream_header_name(header);
        AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_GENERAL, "processing header name " PRInSTR, AWS_BYTE_BUF_PRI(name_buf));

        /* check type first since that's cheaper than a string compare */
        if (header->header_value_type == AWS_EVENT_STREAM_HEADER_INT32) {

            struct aws_byte_buf stream_id_field = aws_byte_buf_from_array(
                aws_event_stream_rpc_stream_id_name.ptr, aws_event_stream_rpc_stream_id_name.len);
            if (aws_byte_buf_eq_ignore_case(&name_buf, &stream_id_field)) {
                *stream_id = aws_event_stream_header_value_as_int32(header);
                AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_GENERAL, "stream id header value %" PRId32, *stream_id);
                stream_id_found += 1;
                goto found;
            }

            struct aws_byte_buf message_type_field = aws_byte_buf_from_array(
                aws_event_stream_rpc_message_type_name.ptr, aws_event_stream_rpc_message_type_name.len);
            if (aws_byte_buf_eq_ignore_case(&name_buf, &message_type_field)) {
                *message_type = aws_event_stream_header_value_as_int32(header);
                AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_GENERAL, "message type header value %" PRId32, *message_type);
                message_type_found += 1;
                goto found;
            }

            struct aws_byte_buf message_flags_field = aws_byte_buf_from_array(
                aws_event_stream_rpc_message_flags_name.ptr, aws_event_stream_rpc_message_flags_name.len);
            if (aws_byte_buf_eq_ignore_case(&name_buf, &message_flags_field)) {
                *message_flags = aws_event_stream_header_value_as_int32(header);
                AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_GENERAL, "message flags header value %" PRId32, *message_flags);
                message_flags_found += 1;
                goto found;
            }
        }

        if (header->header_value_type == AWS_EVENT_STREAM_HEADER_STRING) {
            struct aws_byte_buf operation_field = aws_byte_buf_from_array(
                aws_event_stream_rpc_operation_name.ptr, aws_event_stream_rpc_operation_name.len);

            if (aws_byte_buf_eq_ignore_case(&name_buf, &operation_field)) {
                *operation_name = aws_event_stream_header_value_as_string(header);
                AWS_LOGF_DEBUG(
                    AWS_LS_EVENT_STREAM_GENERAL,
                    "operation name header value" PRInSTR,
                    AWS_BYTE_BUF_PRI(*operation_name));
                operation_name_found += 1;
                goto found;
            }
        }

        continue;

    found:
        if (message_flags_found && message_type_found && stream_id_found && operation_name_found) {
            return AWS_OP_SUCCESS;
        }
    }

    return message_flags_found && message_type_found && stream_id_found
               ? AWS_OP_SUCCESS
               : aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
}