in source/event_stream_rpc_client.c [808:869]
static void s_on_message_received(struct aws_event_stream_message *message, int error_code, void *user_data) {
if (!error_code) {
struct aws_event_stream_rpc_client_connection *connection = user_data;
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: message received on connection of length %" PRIu32,
(void *)connection,
aws_event_stream_message_total_length(message));
struct aws_array_list headers;
if (aws_array_list_init_dynamic(
&headers, connection->allocator, 8, sizeof(struct aws_event_stream_header_value_pair))) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: error initializing headers %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
return;
}
if (aws_event_stream_message_headers(message, &headers)) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: error fetching headers %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
goto clean_up;
}
int32_t stream_id = -1;
int32_t message_type = -1;
int32_t message_flags = -1;
struct aws_byte_buf operation_name_buf;
AWS_ZERO_STRUCT(operation_name_buf);
if (aws_event_stream_rpc_extract_message_metadata(
&headers, &stream_id, &message_type, &message_flags, &operation_name_buf)) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: invalid protocol message with error %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_message_error);
goto clean_up;
}
(void)operation_name_buf;
AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: routing message", (void *)connection);
s_route_message_by_type(connection, message, &headers, stream_id, message_type, message_flags);
clean_up:
aws_event_stream_headers_list_cleanup(&headers);
}
}