in source/event_stream_rpc_client.c [459:632]
static int s_send_protocol_message(
struct aws_event_stream_rpc_client_connection *connection,
struct aws_event_stream_rpc_client_continuation_token *continuation,
struct aws_byte_cursor *operation_name,
const struct aws_event_stream_rpc_message_args *message_args,
int32_t stream_id,
aws_event_stream_rpc_client_message_flush_fn *flush_fn,
void *user_data) {
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: sending message. continuation: %p, stream id %" PRId32,
(void *)connection,
(void *)continuation,
stream_id);
size_t connect_handshake_state = aws_atomic_load_int(&connection->handshake_state);
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: handshake completion value %zu",
(void *)connection,
connect_handshake_state);
/* handshake step 1 is a connect message being received. Handshake 2 is the connect ack being sent.
* no messages other than connect and connect ack are allowed until this count reaches 2. */
if (connect_handshake_state != CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
message_args->message_type < AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: handshake not completed, only a connect message can be sent.",
(void *)connection);
return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
}
struct event_stream_connection_send_message_args *args =
aws_mem_calloc(connection->allocator, 1, sizeof(struct event_stream_connection_send_message_args));
if (!message_args) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: failed to allocate callback arguments %s.",
(void *)connection,
aws_error_debug_str(aws_last_error()));
return AWS_OP_ERR;
}
args->allocator = connection->allocator;
args->user_data = user_data;
args->message_type = message_args->message_type;
args->connection = connection;
args->flush_fn = flush_fn;
if (continuation) {
AWS_LOGF_TRACE(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: sending message on continuation %p",
(void *)connection,
(void *)continuation);
args->continuation = continuation;
aws_event_stream_rpc_client_continuation_acquire(continuation);
if (message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p:end stream flag was specified on continuation %p",
(void *)connection,
(void *)continuation);
args->end_stream = true;
}
}
if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK &&
!(message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_CONNECTION_ACCEPTED)) {
AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: terminating connection", (void *)connection);
args->terminate_connection = true;
}
if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: sending connect message, waiting on connect ack",
(void *)connection);
aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_PROCESSED);
}
args->flush_fn = flush_fn;
size_t headers_count = operation_name ? message_args->headers_count + 4 : message_args->headers_count + 3;
struct aws_array_list headers_list;
AWS_ZERO_STRUCT(headers_list);
if (aws_array_list_init_dynamic(
&headers_list, connection->allocator, headers_count, sizeof(struct aws_event_stream_header_value_pair))) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: an error occurred while initializing the headers list %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
goto args_allocated_before_failure;
}
/* since we preallocated the space for the headers, these can't fail, but we'll go ahead an assert on them just in
* case */
for (size_t i = 0; i < message_args->headers_count; ++i) {
AWS_FATAL_ASSERT(!aws_array_list_push_back(&headers_list, &message_args->headers[i]));
}
AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
&headers_list,
(const char *)aws_event_stream_rpc_message_type_name.ptr,
(uint8_t)aws_event_stream_rpc_message_type_name.len,
message_args->message_type));
AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
&headers_list,
(const char *)aws_event_stream_rpc_message_flags_name.ptr,
(uint8_t)aws_event_stream_rpc_message_flags_name.len,
message_args->message_flags));
AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
&headers_list,
(const char *)aws_event_stream_rpc_stream_id_name.ptr,
(uint8_t)aws_event_stream_rpc_stream_id_name.len,
stream_id));
if (operation_name) {
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: operation name specified " PRInSTR,
(void *)connection,
AWS_BYTE_CURSOR_PRI(*operation_name));
AWS_FATAL_ASSERT(!aws_event_stream_add_string_header(
&headers_list,
(const char *)aws_event_stream_rpc_operation_name.ptr,
(uint8_t)aws_event_stream_rpc_operation_name.len,
(const char *)operation_name->ptr,
(uint16_t)operation_name->len,
0));
}
int message_init_err_code =
aws_event_stream_message_init(&args->message, connection->allocator, &headers_list, message_args->payload);
aws_array_list_clean_up(&headers_list);
if (message_init_err_code) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: message init failed with error %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
goto args_allocated_before_failure;
}
aws_event_stream_rpc_client_connection_acquire(connection);
if (aws_event_stream_channel_handler_write_message(
connection->event_stream_handler, &args->message, s_on_protocol_message_written_fn, args)) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: writing message failed with error %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
goto message_initialized_before_failure;
}
return AWS_OP_SUCCESS;
message_initialized_before_failure:
aws_event_stream_message_clean_up(&args->message);
args_allocated_before_failure:
aws_mem_release(args->allocator, args);
aws_event_stream_rpc_client_connection_release(connection);
return AWS_OP_ERR;
}