static int s_send_protocol_message()

in source/event_stream_rpc_server.c [525:670]


static int s_send_protocol_message(
    struct aws_event_stream_rpc_server_connection *connection,
    struct aws_event_stream_rpc_server_continuation_token *continuation,
    const struct aws_event_stream_rpc_message_args *message_args,
    int32_t stream_id,
    aws_event_stream_rpc_server_message_flush_fn *flush_fn,
    void *user_data) {

    size_t connect_handshake_state = aws_atomic_load_int(&connection->handshake_state);
    AWS_LOGF_TRACE(
        AWS_LS_EVENT_STREAM_RPC_SERVER,
        "id=%p: connect handshake state %zu",
        (void *)connection,
        connect_handshake_state);
    /* handshake step 1 is a connect message being received. Handshake 2 is the connect ack being sent.
     * no messages other than connect and connect ack are allowed until this count reaches 2. */
    if (connect_handshake_state != CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
        message_args->message_type < AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
        AWS_LOGF_TRACE(
            AWS_LS_EVENT_STREAM_RPC_SERVER,
            "id=%p: invalid state, a message was received prior to connect handshake completion",
            (void *)connection);
        return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
    }

    struct event_stream_connection_send_message_args *args =
        aws_mem_calloc(connection->allocator, 1, sizeof(struct event_stream_connection_send_message_args));

    if (!message_args) {
        AWS_LOGF_ERROR(
            AWS_LS_EVENT_STREAM_RPC_SERVER,
            "id=%p: allocation of callback args failed with error %s",
            (void *)connection,
            aws_error_debug_str(aws_last_error()));
        return AWS_OP_ERR;
    }

    args->allocator = connection->allocator;
    args->user_data = user_data;
    args->message_type = message_args->message_type;
    args->connection = connection;
    args->flush_fn = flush_fn;

    if (continuation) {
        args->continuation = continuation;
        aws_event_stream_rpc_server_continuation_acquire(continuation);

        if (message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
            AWS_LOGF_DEBUG(
                AWS_LS_EVENT_STREAM_RPC_SERVER,
                "id=%p: continuation with terminate stream flag was specified closing",
                (void *)continuation);
            args->end_stream = true;
        }
    }

    if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
        AWS_LOGF_INFO(
            AWS_LS_EVENT_STREAM_RPC_SERVER,
            "id=%p: sending connect ack message, the connect handshake is completed",
            (void *)connection);
        aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED);

        if (!(message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_CONNECTION_ACCEPTED)) {
            AWS_LOGF_DEBUG(
                AWS_LS_EVENT_STREAM_RPC_SERVER,
                "id=%p: connection ack was rejected closing connection",
                (void *)connection);
            args->terminate_connection = true;
        }
    }

    args->flush_fn = flush_fn;

    size_t headers_count = message_args->headers_count + 3;
    struct aws_array_list headers_list;
    AWS_ZERO_STRUCT(headers_list);

    if (aws_array_list_init_dynamic(
            &headers_list, connection->allocator, headers_count, sizeof(struct aws_event_stream_header_value_pair))) {
        AWS_LOGF_ERROR(
            AWS_LS_EVENT_STREAM_RPC_SERVER,
            "id=%p: allocation of headers failed with error %s",
            (void *)connection,
            aws_error_debug_str(aws_last_error()));
        goto args_allocated_before_failure;
    }

    /* since we preallocated the space for the headers, these can't fail, but we'll go ahead an assert on them just in
     * case */
    for (size_t i = 0; i < message_args->headers_count; ++i) {
        AWS_FATAL_ASSERT(!aws_array_list_push_back(&headers_list, &message_args->headers[i]));
    }

    AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
        &headers_list,
        (const char *)aws_event_stream_rpc_message_type_name.ptr,
        (uint8_t)aws_event_stream_rpc_message_type_name.len,
        message_args->message_type));
    AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
        &headers_list,
        (const char *)aws_event_stream_rpc_message_flags_name.ptr,
        (uint8_t)aws_event_stream_rpc_message_flags_name.len,
        message_args->message_flags));
    AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
        &headers_list,
        (const char *)aws_event_stream_rpc_stream_id_name.ptr,
        (uint8_t)aws_event_stream_rpc_stream_id_name.len,
        stream_id));

    int message_init_err_code =
        aws_event_stream_message_init(&args->message, connection->allocator, &headers_list, message_args->payload);
    aws_array_list_clean_up(&headers_list);

    if (message_init_err_code) {
        AWS_LOGF_ERROR(
            AWS_LS_EVENT_STREAM_RPC_SERVER,
            "id=%p: initialization of message failed with error %s",
            (void *)connection,
            aws_error_debug_str(aws_last_error()));
        goto args_allocated_before_failure;
    }

    aws_event_stream_rpc_server_connection_acquire(connection);

    if (aws_event_stream_channel_handler_write_message(
            connection->event_stream_handler, &args->message, s_on_protocol_message_written_fn, args)) {
        AWS_LOGF_ERROR(
            AWS_LS_EVENT_STREAM_RPC_SERVER,
            "id=%p: message send failed with error %s",
            (void *)connection,
            aws_error_debug_str(aws_last_error()));
        goto message_initialized_before_failure;
    }

    return AWS_OP_SUCCESS;

message_initialized_before_failure:
    aws_event_stream_message_clean_up(&args->message);

args_allocated_before_failure:
    aws_mem_release(args->allocator, args);
    aws_event_stream_rpc_server_connection_release(connection);

    return AWS_OP_ERR;
}