in source/event_stream_rpc_server.c [808:1046]
static void s_route_message_by_type(
struct aws_event_stream_rpc_server_connection *connection,
struct aws_event_stream_message *message,
struct aws_array_list *headers_list,
uint32_t stream_id,
uint32_t message_type,
uint32_t message_flags,
struct aws_byte_cursor operation_name) {
struct aws_byte_buf payload_buf = aws_byte_buf_from_array(
aws_event_stream_message_payload(message), aws_event_stream_message_payload_len(message));
struct aws_event_stream_rpc_message_args message_args = {
.headers = headers_list->data,
.headers_count = aws_array_list_length(headers_list),
.payload = &payload_buf,
.message_flags = message_flags,
.message_type = message_type,
};
size_t handshake_state = aws_atomic_load_int(&connection->handshake_state);
/* make sure if this is not a CONNECT message being received, the handshake has been completed. */
if (handshake_state < CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
message_type != AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: a message was received on this connection prior to the "
"connect handshake completing",
(void *)connection);
aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
return;
}
/* stream_id being non zero ALWAYS indicates APPLICATION_DATA or APPLICATION_ERROR. */
if (stream_id > 0) {
AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: stream id %" PRIu32, (void *)connection, stream_id);
struct aws_event_stream_rpc_server_continuation_token *continuation = NULL;
if (message_type > AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: only application messages can be sent on a stream id, "
"but this message is the incorrect type",
(void *)connection);
aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_stream_id_error);
return;
}
/* INT32_MAX is the max stream id. */
if (stream_id > INT32_MAX) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: stream_id is larger than the max acceptable value",
(void *)connection);
aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_stream_id_error);
return;
}
/* if the stream is is in the past, look it up from the continuation table. If it's not there, that's an error.
* if it is, find it and notify the user a message arrived */
if (stream_id <= connection->latest_stream_id) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: stream_id is an already seen stream_id, looking for existing continuation",
(void *)connection);
struct aws_hash_element *continuation_element = NULL;
if (aws_hash_table_find(&connection->continuation_table, &stream_id, &continuation_element) ||
!continuation_element) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: stream_id does not have a corresponding continuation",
(void *)connection);
aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_client_stream_id_error);
return;
}
continuation = continuation_element->value;
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: stream_id corresponds to continuation %p",
(void *)connection,
(void *)continuation);
aws_event_stream_rpc_server_continuation_acquire(continuation);
continuation->continuation_fn(continuation, &message_args, continuation->user_data);
aws_event_stream_rpc_server_continuation_release(continuation);
/* now these are potentially new streams. Make sure they're in bounds, create a new continuation
* and notify the user the stream has been created, then send them the message. */
} else {
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: stream_id is unknown, attempting to create a continuation for it",
(void *)connection);
if (stream_id != connection->latest_stream_id + 1) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: stream_id is invalid because it's not sequentially increasing",
(void *)connection);
aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
s_send_connection_level_error(
connection,
AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR,
0,
&s_invalid_new_client_stream_id_error);
return;
}
/* new streams must always have an operation name. */
if (operation_name.len == 0) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: new stream_id encountered, but an operation name was not received",
(void *)connection);
aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_missing_operation_name_error);
return;
}
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: stream_id is a valid new stream. Creating continuation",
(void *)connection);
continuation =
aws_mem_calloc(connection->allocator, 1, sizeof(struct aws_event_stream_rpc_server_continuation_token));
if (!continuation) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: continuation allocation failed with error %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
return;
}
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: new continuation is %p",
(void *)connection,
(void *)continuation);
continuation->stream_id = stream_id;
continuation->connection = connection;
aws_event_stream_rpc_server_connection_acquire(continuation->connection);
aws_atomic_init_int(&continuation->ref_count, 1);
if (aws_hash_table_put(&connection->continuation_table, &continuation->stream_id, continuation, NULL)) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: continuation table update failed with error %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
/* continuation release will drop the connection reference as well */
aws_event_stream_rpc_server_continuation_release(continuation);
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
return;
}
struct aws_event_stream_rpc_server_stream_continuation_options options;
AWS_ZERO_STRUCT(options);
aws_event_stream_rpc_server_continuation_acquire(continuation);
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: invoking on_incoming_stream callback", (void *)connection);
if (connection->on_incoming_stream(
continuation->connection, continuation, operation_name, &options, connection->user_data)) {
aws_event_stream_rpc_server_continuation_release(continuation);
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
return;
}
AWS_FATAL_ASSERT(options.on_continuation);
AWS_FATAL_ASSERT(options.on_continuation_closed);
continuation->continuation_fn = options.on_continuation;
continuation->closed_fn = options.on_continuation_closed;
continuation->user_data = options.user_data;
connection->latest_stream_id = stream_id;
continuation->continuation_fn(continuation, &message_args, continuation->user_data);
aws_event_stream_rpc_server_continuation_release(continuation);
}
/* if it was a terminal stream message purge it from the hash table. The delete will decref the continuation. */
if (message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: the terminate_stream flag was received for continuation %p, closing",
(void *)connection,
(void *)continuation);
aws_atomic_store_int(&continuation->is_closed, 1U);
aws_hash_table_remove(&connection->continuation_table, &stream_id, NULL, NULL);
}
} else {
if (message_type <= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR ||
message_type >= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_COUNT) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: a zero stream id was received with an invalid message-type %" PRIu32,
(void *)connection,
message_type);
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_message_type_error);
return;
}
if (message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
if (handshake_state) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: connect received but the handshake is already completed. Only one is allowed.",
(void *)connection);
/* only one connect is allowed. This would be a duplicate. */
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
return;
}
aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_PROCESSED);
AWS_LOGF_INFO(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: connect received, connection handshake completion pending the server sending an ack.",
(void *)connection);
}
connection->on_connection_protocol_message(connection, &message_args, connection->user_data);
}
}