static void s_route_message_by_type()

in source/event_stream_rpc_client.c [685:805]


static void s_route_message_by_type(
    struct aws_event_stream_rpc_client_connection *connection,
    struct aws_event_stream_message *message,
    struct aws_array_list *headers_list,
    uint32_t stream_id,
    uint32_t message_type,
    uint32_t message_flags) {
    struct aws_byte_buf payload_buf = aws_byte_buf_from_array(
        aws_event_stream_message_payload(message), aws_event_stream_message_payload_len(message));

    struct aws_event_stream_rpc_message_args message_args = {
        .headers = headers_list->data,
        .headers_count = aws_array_list_length(headers_list),
        .payload = &payload_buf,
        .message_flags = message_flags,
        .message_type = message_type,
    };

    size_t handshake_complete = aws_atomic_load_int(&connection->handshake_state);

    /* make sure if this is not a CONNECT message being received, the handshake has been completed. */
    if (handshake_complete < CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
        message_type != AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
        AWS_LOGF_ERROR(
            AWS_LS_EVENT_STREAM_RPC_CLIENT,
            "id=%p: a message was received on this connection prior to the "
            "connect handshake completing",
            (void *)connection);
        aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
        s_send_connection_level_error(
            connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
        return;
    }

    /* stream_id being non zero ALWAYS indicates APPLICATION_DATA or APPLICATION_ERROR. */
    if (stream_id > 0) {
        AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: stream id %" PRIu32, (void *)connection, stream_id);
        struct aws_event_stream_rpc_client_continuation_token *continuation = NULL;
        if (message_type > AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR) {
            AWS_LOGF_ERROR(
                AWS_LS_EVENT_STREAM_RPC_CLIENT,
                "id=%p: only application messages can be sent on a stream id, "
                "but this message is the incorrect type",
                (void *)connection);
            aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
            s_send_connection_level_error(
                connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_stream_id_error);
            return;
        }

        aws_mutex_lock(&connection->stream_lock);
        struct aws_hash_element *continuation_element = NULL;
        if (aws_hash_table_find(&connection->continuation_table, &stream_id, &continuation_element) ||
            !continuation_element) {
            aws_mutex_unlock(&connection->stream_lock);
            AWS_LOGF_ERROR(
                AWS_LS_EVENT_STREAM_RPC_CLIENT,
                "id=%p: a stream id was received that was not created by this client",
                (void *)connection);
            aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
            s_send_connection_level_error(
                connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_client_stream_id_error);
            return;
        }

        aws_mutex_unlock(&connection->stream_lock);

        continuation = continuation_element->value;
        aws_event_stream_rpc_client_continuation_acquire(continuation);
        continuation->continuation_fn(continuation, &message_args, continuation->user_data);
        aws_event_stream_rpc_client_continuation_release(continuation);

        /* if it was a terminal stream message purge it from the hash table. The delete will decref the continuation. */
        if (message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
            AWS_LOGF_DEBUG(
                AWS_LS_EVENT_STREAM_RPC_CLIENT,
                "id=%p: the terminate stream flag was specified for continuation %p",
                (void *)connection,
                (void *)continuation);
            aws_atomic_store_int(&continuation->is_closed, 1U);
            aws_mutex_lock(&connection->stream_lock);
            aws_hash_table_remove(&connection->continuation_table, &stream_id, NULL, NULL);
            aws_mutex_unlock(&connection->stream_lock);

            /* Note that we do not invoke callback while holding lock */
            s_complete_continuation(continuation);
        }
    } else {
        if (message_type <= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR ||
            message_type >= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_COUNT) {
            AWS_LOGF_ERROR(
                AWS_LS_EVENT_STREAM_RPC_CLIENT,
                "id=%p: a zero stream id was received with an invalid message-type %" PRIu32,
                (void *)connection,
                message_type);
            s_send_connection_level_error(
                connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_message_type_error);
            return;
        }

        if (message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
            if (handshake_complete != CONNECTION_HANDSHAKE_STATE_CONNECT_PROCESSED) {
                AWS_LOGF_ERROR(
                    AWS_LS_EVENT_STREAM_RPC_CLIENT,
                    "id=%p: connect ack received but the handshake is already completed. Only one is allowed.",
                    (void *)connection);
                /* only one connect is allowed. This would be a duplicate. */
                s_send_connection_level_error(
                    connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
                return;
            }
            aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED);
            AWS_LOGF_INFO(
                AWS_LS_EVENT_STREAM_RPC_CLIENT,
                "id=%p: connect ack received, connection handshake completed",
                (void *)connection);
        }

        connection->on_connection_protocol_message(connection, &message_args, connection->user_data);
    }
}