in source/event_stream_rpc_client.c [685:805]
static void s_route_message_by_type(
struct aws_event_stream_rpc_client_connection *connection,
struct aws_event_stream_message *message,
struct aws_array_list *headers_list,
uint32_t stream_id,
uint32_t message_type,
uint32_t message_flags) {
struct aws_byte_buf payload_buf = aws_byte_buf_from_array(
aws_event_stream_message_payload(message), aws_event_stream_message_payload_len(message));
struct aws_event_stream_rpc_message_args message_args = {
.headers = headers_list->data,
.headers_count = aws_array_list_length(headers_list),
.payload = &payload_buf,
.message_flags = message_flags,
.message_type = message_type,
};
size_t handshake_complete = aws_atomic_load_int(&connection->handshake_state);
/* make sure if this is not a CONNECT message being received, the handshake has been completed. */
if (handshake_complete < CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
message_type != AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: a message was received on this connection prior to the "
"connect handshake completing",
(void *)connection);
aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
return;
}
/* stream_id being non zero ALWAYS indicates APPLICATION_DATA or APPLICATION_ERROR. */
if (stream_id > 0) {
AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: stream id %" PRIu32, (void *)connection, stream_id);
struct aws_event_stream_rpc_client_continuation_token *continuation = NULL;
if (message_type > AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: only application messages can be sent on a stream id, "
"but this message is the incorrect type",
(void *)connection);
aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_stream_id_error);
return;
}
aws_mutex_lock(&connection->stream_lock);
struct aws_hash_element *continuation_element = NULL;
if (aws_hash_table_find(&connection->continuation_table, &stream_id, &continuation_element) ||
!continuation_element) {
aws_mutex_unlock(&connection->stream_lock);
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: a stream id was received that was not created by this client",
(void *)connection);
aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_client_stream_id_error);
return;
}
aws_mutex_unlock(&connection->stream_lock);
continuation = continuation_element->value;
aws_event_stream_rpc_client_continuation_acquire(continuation);
continuation->continuation_fn(continuation, &message_args, continuation->user_data);
aws_event_stream_rpc_client_continuation_release(continuation);
/* if it was a terminal stream message purge it from the hash table. The delete will decref the continuation. */
if (message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: the terminate stream flag was specified for continuation %p",
(void *)connection,
(void *)continuation);
aws_atomic_store_int(&continuation->is_closed, 1U);
aws_mutex_lock(&connection->stream_lock);
aws_hash_table_remove(&connection->continuation_table, &stream_id, NULL, NULL);
aws_mutex_unlock(&connection->stream_lock);
/* Note that we do not invoke callback while holding lock */
s_complete_continuation(continuation);
}
} else {
if (message_type <= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR ||
message_type >= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_COUNT) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: a zero stream id was received with an invalid message-type %" PRIu32,
(void *)connection,
message_type);
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_message_type_error);
return;
}
if (message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
if (handshake_complete != CONNECTION_HANDSHAKE_STATE_CONNECT_PROCESSED) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: connect ack received but the handshake is already completed. Only one is allowed.",
(void *)connection);
/* only one connect is allowed. This would be a duplicate. */
s_send_connection_level_error(
connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
return;
}
aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED);
AWS_LOGF_INFO(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: connect ack received, connection handshake completed",
(void *)connection);
}
connection->on_connection_protocol_message(connection, &message_args, connection->user_data);
}
}