in source/event_stream_rpc_server.c [525:670]
static int s_send_protocol_message(
struct aws_event_stream_rpc_server_connection *connection,
struct aws_event_stream_rpc_server_continuation_token *continuation,
const struct aws_event_stream_rpc_message_args *message_args,
int32_t stream_id,
aws_event_stream_rpc_server_message_flush_fn *flush_fn,
void *user_data) {
size_t connect_handshake_state = aws_atomic_load_int(&connection->handshake_state);
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: connect handshake state %zu",
(void *)connection,
connect_handshake_state);
/* handshake step 1 is a connect message being received. Handshake 2 is the connect ack being sent.
* no messages other than connect and connect ack are allowed until this count reaches 2. */
if (connect_handshake_state != CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
message_args->message_type < AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: invalid state, a message was received prior to connect handshake completion",
(void *)connection);
return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
}
struct event_stream_connection_send_message_args *args =
aws_mem_calloc(connection->allocator, 1, sizeof(struct event_stream_connection_send_message_args));
if (!message_args) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: allocation of callback args failed with error %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
return AWS_OP_ERR;
}
args->allocator = connection->allocator;
args->user_data = user_data;
args->message_type = message_args->message_type;
args->connection = connection;
args->flush_fn = flush_fn;
if (continuation) {
args->continuation = continuation;
aws_event_stream_rpc_server_continuation_acquire(continuation);
if (message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: continuation with terminate stream flag was specified closing",
(void *)continuation);
args->end_stream = true;
}
}
if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
AWS_LOGF_INFO(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: sending connect ack message, the connect handshake is completed",
(void *)connection);
aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED);
if (!(message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_CONNECTION_ACCEPTED)) {
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: connection ack was rejected closing connection",
(void *)connection);
args->terminate_connection = true;
}
}
args->flush_fn = flush_fn;
size_t headers_count = message_args->headers_count + 3;
struct aws_array_list headers_list;
AWS_ZERO_STRUCT(headers_list);
if (aws_array_list_init_dynamic(
&headers_list, connection->allocator, headers_count, sizeof(struct aws_event_stream_header_value_pair))) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: allocation of headers failed with error %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
goto args_allocated_before_failure;
}
/* since we preallocated the space for the headers, these can't fail, but we'll go ahead an assert on them just in
* case */
for (size_t i = 0; i < message_args->headers_count; ++i) {
AWS_FATAL_ASSERT(!aws_array_list_push_back(&headers_list, &message_args->headers[i]));
}
AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
&headers_list,
(const char *)aws_event_stream_rpc_message_type_name.ptr,
(uint8_t)aws_event_stream_rpc_message_type_name.len,
message_args->message_type));
AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
&headers_list,
(const char *)aws_event_stream_rpc_message_flags_name.ptr,
(uint8_t)aws_event_stream_rpc_message_flags_name.len,
message_args->message_flags));
AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
&headers_list,
(const char *)aws_event_stream_rpc_stream_id_name.ptr,
(uint8_t)aws_event_stream_rpc_stream_id_name.len,
stream_id));
int message_init_err_code =
aws_event_stream_message_init(&args->message, connection->allocator, &headers_list, message_args->payload);
aws_array_list_clean_up(&headers_list);
if (message_init_err_code) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: initialization of message failed with error %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
goto args_allocated_before_failure;
}
aws_event_stream_rpc_server_connection_acquire(connection);
if (aws_event_stream_channel_handler_write_message(
connection->event_stream_handler, &args->message, s_on_protocol_message_written_fn, args)) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_SERVER,
"id=%p: message send failed with error %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
goto message_initialized_before_failure;
}
return AWS_OP_SUCCESS;
message_initialized_before_failure:
aws_event_stream_message_clean_up(&args->message);
args_allocated_before_failure:
aws_mem_release(args->allocator, args);
aws_event_stream_rpc_server_connection_release(connection);
return AWS_OP_ERR;
}