int aws_event_stream_rpc_client_continuation_activate()

in source/event_stream_rpc_client.c [950:1024]


int aws_event_stream_rpc_client_continuation_activate(
    struct aws_event_stream_rpc_client_continuation_token *continuation,
    struct aws_byte_cursor operation_name,
    const struct aws_event_stream_rpc_message_args *message_args,
    aws_event_stream_rpc_client_message_flush_fn *flush_fn,
    void *user_data) {

    AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: activating continuation", (void *)continuation);
    int ret_val = AWS_OP_ERR;

    aws_mutex_lock(&continuation->connection->stream_lock);

    if (continuation->stream_id) {
        AWS_LOGF_ERROR(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: stream has already been activated", (void *)continuation)
        aws_raise_error(AWS_ERROR_INVALID_STATE);
        goto clean_up;
    }

    /* Even though is_open is atomic, we need to hold a lock while checking it.
     * This lets us coordinate with code that sets is_open to false. */
    if (!aws_event_stream_rpc_client_connection_is_open(continuation->connection)) {
        AWS_LOGF_ERROR(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: stream's connection is not open", (void *)continuation)
        aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_CONNECTION_CLOSED);
        goto clean_up;
    }

    /* we cannot update the connection's stream id until we're certain the message at least made it to the wire, because
     * the next stream id must be consecutively increasing by 1. So send the message then update the connection state
     * once we've made it to the wire. */
    continuation->stream_id = continuation->connection->latest_stream_id + 1;
    AWS_LOGF_DEBUG(
        AWS_LS_EVENT_STREAM_RPC_CLIENT,
        "id=%p: continuation's new stream id is %" PRIu32,
        (void *)continuation,
        continuation->stream_id);

    if (aws_hash_table_put(
            &continuation->connection->continuation_table, &continuation->stream_id, continuation, NULL)) {
        AWS_LOGF_ERROR(
            AWS_LS_EVENT_STREAM_RPC_CLIENT,
            "id=%p: storing the new stream failed with %s",
            (void *)continuation,
            aws_error_debug_str(aws_last_error()));
        continuation->stream_id = 0;
        goto clean_up;
    }

    if (s_send_protocol_message(
            continuation->connection,
            continuation,
            &operation_name,
            message_args,
            continuation->stream_id,
            flush_fn,
            user_data)) {
        aws_hash_table_remove(&continuation->connection->continuation_table, &continuation->stream_id, NULL, NULL);
        continuation->stream_id = 0;
        AWS_LOGF_ERROR(
            AWS_LS_EVENT_STREAM_RPC_CLIENT,
            "id=%p: failed to flush the new stream to the channel with error %s",
            (void *)continuation,
            aws_error_debug_str(aws_last_error()));
        goto clean_up;
    }

    /* The continuation table gets a ref count on the continuation. Take it here. */
    aws_event_stream_rpc_client_continuation_acquire(continuation);

    continuation->connection->latest_stream_id = continuation->stream_id;
    ret_val = AWS_OP_SUCCESS;

clean_up:
    aws_mutex_unlock(&continuation->connection->stream_lock);
    return ret_val;
}