static void s_route_message_by_type()

in source/event_stream_rpc_server.c [808:1046]


static void s_route_message_by_type(
    struct aws_event_stream_rpc_server_connection *connection,
    struct aws_event_stream_message *message,
    struct aws_array_list *headers_list,
    uint32_t stream_id,
    uint32_t message_type,
    uint32_t message_flags,
    struct aws_byte_cursor operation_name) {
    struct aws_byte_buf payload_buf = aws_byte_buf_from_array(
        aws_event_stream_message_payload(message), aws_event_stream_message_payload_len(message));

    struct aws_event_stream_rpc_message_args message_args = {
        .headers = headers_list->data,
        .headers_count = aws_array_list_length(headers_list),
        .payload = &payload_buf,
        .message_flags = message_flags,
        .message_type = message_type,
    };

    size_t handshake_state = aws_atomic_load_int(&connection->handshake_state);

    /* make sure if this is not a CONNECT message being received, the handshake has been completed. */
    if (handshake_state < CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
        message_type != AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
        AWS_LOGF_ERROR(
            AWS_LS_EVENT_STREAM_RPC_SERVER,
            "id=%p: a message was received on this connection prior to the "
            "connect handshake completing",
            (void *)connection);
        aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
        s_send_connection_level_error(
            connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
        return;
    }

    /* stream_id being non zero ALWAYS indicates APPLICATION_DATA or APPLICATION_ERROR. */
    if (stream_id > 0) {
        AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: stream id %" PRIu32, (void *)connection, stream_id);

        struct aws_event_stream_rpc_server_continuation_token *continuation = NULL;
        if (message_type > AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR) {
            AWS_LOGF_ERROR(
                AWS_LS_EVENT_STREAM_RPC_SERVER,
                "id=%p: only application messages can be sent on a stream id, "
                "but this message is the incorrect type",
                (void *)connection);
            aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
            s_send_connection_level_error(
                connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_stream_id_error);
            return;
        }

        /* INT32_MAX is the max stream id. */
        if (stream_id > INT32_MAX) {
            AWS_LOGF_ERROR(
                AWS_LS_EVENT_STREAM_RPC_SERVER,
                "id=%p: stream_id is larger than the max acceptable value",
                (void *)connection);
            aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
            s_send_connection_level_error(
                connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_stream_id_error);
            return;
        }

        /* if the stream is is in the past, look it up from the continuation table. If it's not there, that's an error.
         * if it is, find it and notify the user a message arrived */
        if (stream_id <= connection->latest_stream_id) {
            AWS_LOGF_ERROR(
                AWS_LS_EVENT_STREAM_RPC_SERVER,
                "id=%p: stream_id is an already seen stream_id, looking for existing continuation",
                (void *)connection);

            struct aws_hash_element *continuation_element = NULL;
            if (aws_hash_table_find(&connection->continuation_table, &stream_id, &continuation_element) ||
                !continuation_element) {
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_RPC_SERVER,
                    "id=%p: stream_id does not have a corresponding continuation",
                    (void *)connection);
                aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
                s_send_connection_level_error(
                    connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_client_stream_id_error);
                return;
            }

            continuation = continuation_element->value;
            AWS_LOGF_TRACE(
                AWS_LS_EVENT_STREAM_RPC_SERVER,
                "id=%p: stream_id corresponds to continuation %p",
                (void *)connection,
                (void *)continuation);

            aws_event_stream_rpc_server_continuation_acquire(continuation);
            continuation->continuation_fn(continuation, &message_args, continuation->user_data);
            aws_event_stream_rpc_server_continuation_release(continuation);
            /* now these are potentially new streams. Make sure they're in bounds, create a new continuation
             * and notify the user the stream has been created, then send them the message. */
        } else {
            AWS_LOGF_TRACE(
                AWS_LS_EVENT_STREAM_RPC_SERVER,
                "id=%p: stream_id is unknown, attempting to create a continuation for it",
                (void *)connection);
            if (stream_id != connection->latest_stream_id + 1) {
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_RPC_SERVER,
                    "id=%p: stream_id is invalid because it's not sequentially increasing",
                    (void *)connection);

                aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
                s_send_connection_level_error(
                    connection,
                    AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR,
                    0,
                    &s_invalid_new_client_stream_id_error);
                return;
            }

            /* new streams must always have an operation name. */
            if (operation_name.len == 0) {
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_RPC_SERVER,
                    "id=%p: new stream_id encountered, but an operation name was not received",
                    (void *)connection);
                aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
                s_send_connection_level_error(
                    connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_missing_operation_name_error);
                return;
            }

            AWS_LOGF_DEBUG(
                AWS_LS_EVENT_STREAM_RPC_SERVER,
                "id=%p: stream_id is a valid new stream. Creating continuation",
                (void *)connection);
            continuation =
                aws_mem_calloc(connection->allocator, 1, sizeof(struct aws_event_stream_rpc_server_continuation_token));
            if (!continuation) {
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_RPC_SERVER,
                    "id=%p: continuation allocation failed with error %s",
                    (void *)connection,
                    aws_error_debug_str(aws_last_error()));
                s_send_connection_level_error(
                    connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
                return;
            }

            AWS_LOGF_DEBUG(
                AWS_LS_EVENT_STREAM_RPC_SERVER,
                "id=%p: new continuation is %p",
                (void *)connection,
                (void *)continuation);

            continuation->stream_id = stream_id;
            continuation->connection = connection;
            aws_event_stream_rpc_server_connection_acquire(continuation->connection);
            aws_atomic_init_int(&continuation->ref_count, 1);

            if (aws_hash_table_put(&connection->continuation_table, &continuation->stream_id, continuation, NULL)) {
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_RPC_SERVER,
                    "id=%p: continuation table update failed with error %s",
                    (void *)connection,
                    aws_error_debug_str(aws_last_error()));
                /* continuation release will drop the connection reference as well */
                aws_event_stream_rpc_server_continuation_release(continuation);
                s_send_connection_level_error(
                    connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
                return;
            }

            struct aws_event_stream_rpc_server_stream_continuation_options options;
            AWS_ZERO_STRUCT(options);

            aws_event_stream_rpc_server_continuation_acquire(continuation);
            AWS_LOGF_TRACE(
                AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: invoking on_incoming_stream callback", (void *)connection);
            if (connection->on_incoming_stream(
                    continuation->connection, continuation, operation_name, &options, connection->user_data)) {
                aws_event_stream_rpc_server_continuation_release(continuation);
                s_send_connection_level_error(
                    connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
                return;
            }
            AWS_FATAL_ASSERT(options.on_continuation);
            AWS_FATAL_ASSERT(options.on_continuation_closed);

            continuation->continuation_fn = options.on_continuation;
            continuation->closed_fn = options.on_continuation_closed;
            continuation->user_data = options.user_data;

            connection->latest_stream_id = stream_id;
            continuation->continuation_fn(continuation, &message_args, continuation->user_data);
            aws_event_stream_rpc_server_continuation_release(continuation);
        }

        /* if it was a terminal stream message purge it from the hash table. The delete will decref the continuation. */
        if (message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
            AWS_LOGF_DEBUG(
                AWS_LS_EVENT_STREAM_RPC_SERVER,
                "id=%p: the terminate_stream flag was received for continuation %p, closing",
                (void *)connection,
                (void *)continuation);
            aws_atomic_store_int(&continuation->is_closed, 1U);
            aws_hash_table_remove(&connection->continuation_table, &stream_id, NULL, NULL);
        }
    } else {
        if (message_type <= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR ||
            message_type >= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_COUNT) {
            AWS_LOGF_ERROR(
                AWS_LS_EVENT_STREAM_RPC_SERVER,
                "id=%p: a zero stream id was received with an invalid message-type %" PRIu32,
                (void *)connection,
                message_type);
            s_send_connection_level_error(
                connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_message_type_error);
            return;
        }

        if (message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
            if (handshake_state) {
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_RPC_SERVER,
                    "id=%p: connect received but the handshake is already completed. Only one is allowed.",
                    (void *)connection);
                /* only one connect is allowed. This would be a duplicate. */
                s_send_connection_level_error(
                    connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
                return;
            }
            aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_PROCESSED);
            AWS_LOGF_INFO(
                AWS_LS_EVENT_STREAM_RPC_SERVER,
                "id=%p: connect received, connection handshake completion pending the server sending an ack.",
                (void *)connection);
        }

        connection->on_connection_protocol_message(connection, &message_args, connection->user_data);
    }
}