in source/event_stream_rpc_client.c [950:1024]
int aws_event_stream_rpc_client_continuation_activate(
struct aws_event_stream_rpc_client_continuation_token *continuation,
struct aws_byte_cursor operation_name,
const struct aws_event_stream_rpc_message_args *message_args,
aws_event_stream_rpc_client_message_flush_fn *flush_fn,
void *user_data) {
AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: activating continuation", (void *)continuation);
int ret_val = AWS_OP_ERR;
aws_mutex_lock(&continuation->connection->stream_lock);
if (continuation->stream_id) {
AWS_LOGF_ERROR(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: stream has already been activated", (void *)continuation)
aws_raise_error(AWS_ERROR_INVALID_STATE);
goto clean_up;
}
/* Even though is_open is atomic, we need to hold a lock while checking it.
* This lets us coordinate with code that sets is_open to false. */
if (!aws_event_stream_rpc_client_connection_is_open(continuation->connection)) {
AWS_LOGF_ERROR(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: stream's connection is not open", (void *)continuation)
aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_CONNECTION_CLOSED);
goto clean_up;
}
/* we cannot update the connection's stream id until we're certain the message at least made it to the wire, because
* the next stream id must be consecutively increasing by 1. So send the message then update the connection state
* once we've made it to the wire. */
continuation->stream_id = continuation->connection->latest_stream_id + 1;
AWS_LOGF_DEBUG(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: continuation's new stream id is %" PRIu32,
(void *)continuation,
continuation->stream_id);
if (aws_hash_table_put(
&continuation->connection->continuation_table, &continuation->stream_id, continuation, NULL)) {
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: storing the new stream failed with %s",
(void *)continuation,
aws_error_debug_str(aws_last_error()));
continuation->stream_id = 0;
goto clean_up;
}
if (s_send_protocol_message(
continuation->connection,
continuation,
&operation_name,
message_args,
continuation->stream_id,
flush_fn,
user_data)) {
aws_hash_table_remove(&continuation->connection->continuation_table, &continuation->stream_id, NULL, NULL);
continuation->stream_id = 0;
AWS_LOGF_ERROR(
AWS_LS_EVENT_STREAM_RPC_CLIENT,
"id=%p: failed to flush the new stream to the channel with error %s",
(void *)continuation,
aws_error_debug_str(aws_last_error()));
goto clean_up;
}
/* The continuation table gets a ref count on the continuation. Take it here. */
aws_event_stream_rpc_client_continuation_acquire(continuation);
continuation->connection->latest_stream_id = continuation->stream_id;
ret_val = AWS_OP_SUCCESS;
clean_up:
aws_mutex_unlock(&continuation->connection->stream_lock);
return ret_val;
}