in source/event_stream_rpc_server.c [1049:1115]
static void s_on_message_received(struct aws_event_stream_message *message, int error_code, void *user_data) {
if (!error_code) {
struct aws_event_stream_rpc_server_connection *connection = user_data;
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: message received on connection of length %" PRIu32,
(void *)connection,
aws_event_stream_message_total_length(message));
struct aws_array_list headers;
if (aws_array_list_init_dynamic(
&headers, connection->allocator, 8, sizeof(struct aws_event_stream_header_value_pair))) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: error initializing headers %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
return;
}
if (aws_event_stream_message_headers(message, &headers)) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: error fetching headers %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
goto clean_up;
}
int32_t stream_id = -1;
int32_t message_type = -1;
int32_t message_flags = -1;
struct aws_byte_buf operation_name_buf;
AWS_ZERO_STRUCT(operation_name_buf);
if (aws_event_stream_rpc_extract_message_metadata(
&headers, &stream_id, &message_type, &message_flags, &operation_name_buf)) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: invalid protocol message with error %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_message_error);
goto clean_up;
}
AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: routing message", (void *)connection);
s_route_message_by_type(
connection,
message,
&headers,
stream_id,
message_type,
message_flags,
aws_byte_cursor_from_buf(&operation_name_buf));
clean_up:
aws_event_stream_headers_list_cleanup(&headers);
}
}