in source/client.c [1706:1803]
static enum aws_mqtt_client_request_state s_subscribe_send(uint16_t packet_id, bool is_first_attempt, void *userdata) {
(void)is_first_attempt;
struct subscribe_task_arg *task_arg = userdata;
bool initing_packet = task_arg->subscribe.fixed_header.packet_type == 0;
struct aws_io_message *message = NULL;
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: Attempting send of subscribe %" PRIu16 " (%s)",
(void *)task_arg->connection,
packet_id,
is_first_attempt ? "first attempt" : "resend");
if (initing_packet) {
/* Init the subscribe packet */
if (aws_mqtt_packet_subscribe_init(&task_arg->subscribe, task_arg->connection->allocator, packet_id)) {
return AWS_MQTT_CLIENT_REQUEST_ERROR;
}
}
const size_t num_topics = aws_array_list_length(&task_arg->topics);
if (num_topics <= 0) {
aws_raise_error(AWS_ERROR_MQTT_INVALID_TOPIC);
return AWS_MQTT_CLIENT_REQUEST_ERROR;
}
AWS_VARIABLE_LENGTH_ARRAY(uint8_t, transaction_buf, num_topics * aws_mqtt_topic_tree_action_size);
struct aws_array_list transaction;
aws_array_list_init_static(&transaction, transaction_buf, num_topics, aws_mqtt_topic_tree_action_size);
for (size_t i = 0; i < num_topics; ++i) {
struct subscribe_task_topic *topic = NULL;
aws_array_list_get_at(&task_arg->topics, &topic, i);
AWS_ASSUME(topic); /* We know we're within bounds */
if (initing_packet) {
if (aws_mqtt_packet_subscribe_add_topic(&task_arg->subscribe, topic->request.topic, topic->request.qos)) {
goto handle_error;
}
}
if (!task_arg->tree_updated) {
if (aws_mqtt_topic_tree_transaction_insert(
&task_arg->connection->thread_data.subscriptions,
&transaction,
topic->filter,
topic->request.qos,
s_on_publish_client_wrapper,
s_task_topic_release,
topic)) {
goto handle_error;
}
/* If insert succeed, acquire the refcount */
aws_ref_count_acquire(&topic->ref_count);
}
}
message = mqtt_get_message_for_packet(task_arg->connection, &task_arg->subscribe.fixed_header);
if (!message) {
goto handle_error;
}
if (aws_mqtt_packet_subscribe_encode(&message->message_data, &task_arg->subscribe)) {
goto handle_error;
}
/* This is not necessarily a fatal error; if the subscribe fails, it'll just retry. Still need to clean up though.
*/
if (aws_channel_slot_send_message(task_arg->connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
aws_mem_release(message->allocator, message);
}
if (!task_arg->tree_updated) {
aws_mqtt_topic_tree_transaction_commit(&task_arg->connection->thread_data.subscriptions, &transaction);
task_arg->tree_updated = true;
}
aws_array_list_clean_up(&transaction);
return AWS_MQTT_CLIENT_REQUEST_ONGOING;
handle_error:
if (message) {
aws_mem_release(message->allocator, message);
}
if (!task_arg->tree_updated) {
aws_mqtt_topic_tree_transaction_roll_back(&task_arg->connection->thread_data.subscriptions, &transaction);
}
aws_array_list_clean_up(&transaction);
return AWS_MQTT_CLIENT_REQUEST_ERROR;
}