in source/client_channel_handler.c [788:915]
uint16_t mqtt_create_request(
struct aws_mqtt_client_connection *connection,
aws_mqtt_send_request_fn *send_request,
void *send_request_ud,
aws_mqtt_op_complete_fn *on_complete,
void *on_complete_ud,
bool noRetry) {
AWS_ASSERT(connection);
AWS_ASSERT(send_request);
struct aws_mqtt_request *next_request = NULL;
bool should_schedule_task = false;
struct aws_channel *channel = NULL;
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);
if (connection->synced_data.state == AWS_MQTT_CLIENT_STATE_DISCONNECTING) {
mqtt_connection_unlock_synced_data(connection);
/* User requested disconnecting, ensure no new requests are made until the channel finished shutting
* down. */
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT,
"id=%p: Disconnect requested, stop creating any new request until disconnect process finishes.",
(void *)connection);
aws_raise_error(AWS_ERROR_MQTT_CONNECTION_DISCONNECTING);
return 0;
}
if (noRetry && connection->synced_data.state != AWS_MQTT_CLIENT_STATE_CONNECTED) {
mqtt_connection_unlock_synced_data(connection);
/* Not offline queueing QoS 0 publish or PINGREQ. Fail the call. */
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"id=%p: Not currently connected. No offline queueing for QoS 0 publish or pingreq.",
(void *)connection);
aws_raise_error(AWS_ERROR_MQTT_NOT_CONNECTED);
return 0;
}
/**
* Find a free packet ID.
* QoS 0 PUBLISH packets don't actually need an ID on the wire,
* but we assign them internally anyway just so everything has a unique ID.
*
* Yes, this is an O(N) search.
* We remember the last ID we assigned, so it's O(1) in the common case.
* But it's theoretically possible to reach O(N) where N is just above 64000
* if the user is letting a ton of un-ack'd messages queue up
*/
uint16_t search_start = connection->synced_data.packet_id;
struct aws_hash_element *elem = NULL;
while (true) {
/* Increment ID, watch out for overflow, ID cannot be 0 */
if (connection->synced_data.packet_id == UINT16_MAX) {
connection->synced_data.packet_id = 1;
} else {
connection->synced_data.packet_id++;
}
/* Is there already an outstanding request using this ID? */
aws_hash_table_find(
&connection->synced_data.outstanding_requests_table, &connection->synced_data.packet_id, &elem);
if (elem == NULL) {
/* Found a free ID! Break out of loop */
break;
} else if (connection->synced_data.packet_id == search_start) {
/* Every ID is taken */
mqtt_connection_unlock_synced_data(connection);
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT,
"id=%p: Queue is full. No more packet IDs are available at this time.",
(void *)connection);
aws_raise_error(AWS_ERROR_MQTT_QUEUE_FULL);
return 0;
}
}
next_request = aws_memory_pool_acquire(&connection->synced_data.requests_pool);
if (!next_request) {
mqtt_connection_unlock_synced_data(connection);
return 0;
}
memset(next_request, 0, sizeof(struct aws_mqtt_request));
next_request->packet_id = connection->synced_data.packet_id;
if (aws_hash_table_put(
&connection->synced_data.outstanding_requests_table, &next_request->packet_id, next_request, NULL)) {
/* failed to put the next request into the table */
aws_memory_pool_release(&connection->synced_data.requests_pool, next_request);
mqtt_connection_unlock_synced_data(connection);
return 0;
}
/* Store the request by packet_id */
next_request->allocator = connection->allocator;
next_request->connection = connection;
next_request->initiated = false;
next_request->retryable = !noRetry;
next_request->send_request = send_request;
next_request->send_request_ud = send_request_ud;
next_request->on_complete = on_complete;
next_request->on_complete_ud = on_complete_ud;
aws_channel_task_init(
&next_request->outgoing_task, s_request_outgoing_task, next_request, "mqtt_outgoing_request_task");
if (connection->synced_data.state != AWS_MQTT_CLIENT_STATE_CONNECTED) {
aws_linked_list_push_back(&connection->synced_data.pending_requests_list, &next_request->list_node);
} else {
AWS_ASSERT(connection->slot);
AWS_ASSERT(connection->slot->channel);
should_schedule_task = true;
channel = connection->slot->channel;
/* keep the channel alive until the task is scheduled */
aws_channel_acquire_hold(channel);
}
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
if (should_schedule_task) {
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: Currently not in the event-loop thread, scheduling a task to send message id %" PRIu16 ".",
(void *)connection,
next_request->packet_id);
aws_channel_schedule_task_now(channel, &next_request->outgoing_task);
/* release the refcount we hold with the protection of lock */
aws_channel_release_hold(channel);
}
return next_request->packet_id;
}