uint16_t mqtt_create_request()

in source/client_channel_handler.c [788:915]


uint16_t mqtt_create_request(
    struct aws_mqtt_client_connection *connection,
    aws_mqtt_send_request_fn *send_request,
    void *send_request_ud,
    aws_mqtt_op_complete_fn *on_complete,
    void *on_complete_ud,
    bool noRetry) {

    AWS_ASSERT(connection);
    AWS_ASSERT(send_request);
    struct aws_mqtt_request *next_request = NULL;
    bool should_schedule_task = false;
    struct aws_channel *channel = NULL;
    { /* BEGIN CRITICAL SECTION */
        mqtt_connection_lock_synced_data(connection);
        if (connection->synced_data.state == AWS_MQTT_CLIENT_STATE_DISCONNECTING) {
            mqtt_connection_unlock_synced_data(connection);
            /* User requested disconnecting, ensure no new requests are made until the channel finished shutting
             * down. */
            AWS_LOGF_ERROR(
                AWS_LS_MQTT_CLIENT,
                "id=%p: Disconnect requested, stop creating any new request until disconnect process finishes.",
                (void *)connection);
            aws_raise_error(AWS_ERROR_MQTT_CONNECTION_DISCONNECTING);
            return 0;
        }

        if (noRetry && connection->synced_data.state != AWS_MQTT_CLIENT_STATE_CONNECTED) {
            mqtt_connection_unlock_synced_data(connection);
            /* Not offline queueing QoS 0 publish or PINGREQ. Fail the call. */
            AWS_LOGF_DEBUG(
                AWS_LS_MQTT_CLIENT,
                "id=%p: Not currently connected. No offline queueing for QoS 0 publish or pingreq.",
                (void *)connection);
            aws_raise_error(AWS_ERROR_MQTT_NOT_CONNECTED);
            return 0;
        }
        /**
         * Find a free packet ID.
         * QoS 0 PUBLISH packets don't actually need an ID on the wire,
         * but we assign them internally anyway just so everything has a unique ID.
         *
         * Yes, this is an O(N) search.
         * We remember the last ID we assigned, so it's O(1) in the common case.
         * But it's theoretically possible to reach O(N) where N is just above 64000
         * if the user is letting a ton of un-ack'd messages queue up
         */
        uint16_t search_start = connection->synced_data.packet_id;
        struct aws_hash_element *elem = NULL;
        while (true) {
            /* Increment ID, watch out for overflow, ID cannot be 0 */
            if (connection->synced_data.packet_id == UINT16_MAX) {
                connection->synced_data.packet_id = 1;
            } else {
                connection->synced_data.packet_id++;
            }

            /* Is there already an outstanding request using this ID? */
            aws_hash_table_find(
                &connection->synced_data.outstanding_requests_table, &connection->synced_data.packet_id, &elem);

            if (elem == NULL) {
                /* Found a free ID! Break out of loop */
                break;
            } else if (connection->synced_data.packet_id == search_start) {
                /* Every ID is taken */
                mqtt_connection_unlock_synced_data(connection);
                AWS_LOGF_ERROR(
                    AWS_LS_MQTT_CLIENT,
                    "id=%p: Queue is full. No more packet IDs are available at this time.",
                    (void *)connection);
                aws_raise_error(AWS_ERROR_MQTT_QUEUE_FULL);
                return 0;
            }
        }

        next_request = aws_memory_pool_acquire(&connection->synced_data.requests_pool);
        if (!next_request) {
            mqtt_connection_unlock_synced_data(connection);
            return 0;
        }
        memset(next_request, 0, sizeof(struct aws_mqtt_request));

        next_request->packet_id = connection->synced_data.packet_id;

        if (aws_hash_table_put(
                &connection->synced_data.outstanding_requests_table, &next_request->packet_id, next_request, NULL)) {
            /* failed to put the next request into the table */
            aws_memory_pool_release(&connection->synced_data.requests_pool, next_request);
            mqtt_connection_unlock_synced_data(connection);
            return 0;
        }
        /* Store the request by packet_id */
        next_request->allocator = connection->allocator;
        next_request->connection = connection;
        next_request->initiated = false;
        next_request->retryable = !noRetry;
        next_request->send_request = send_request;
        next_request->send_request_ud = send_request_ud;
        next_request->on_complete = on_complete;
        next_request->on_complete_ud = on_complete_ud;
        aws_channel_task_init(
            &next_request->outgoing_task, s_request_outgoing_task, next_request, "mqtt_outgoing_request_task");
        if (connection->synced_data.state != AWS_MQTT_CLIENT_STATE_CONNECTED) {
            aws_linked_list_push_back(&connection->synced_data.pending_requests_list, &next_request->list_node);
        } else {
            AWS_ASSERT(connection->slot);
            AWS_ASSERT(connection->slot->channel);
            should_schedule_task = true;
            channel = connection->slot->channel;
            /* keep the channel alive until the task is scheduled */
            aws_channel_acquire_hold(channel);
        }
        mqtt_connection_unlock_synced_data(connection);
    } /* END CRITICAL SECTION */
    if (should_schedule_task) {
        AWS_LOGF_TRACE(
            AWS_LS_MQTT_CLIENT,
            "id=%p: Currently not in the event-loop thread, scheduling a task to send message id %" PRIu16 ".",
            (void *)connection,
            next_request->packet_id);
        aws_channel_schedule_task_now(channel, &next_request->outgoing_task);
        /* release the refcount we hold with the protection of lock */
        aws_channel_release_hold(channel);
    }

    return next_request->packet_id;
}