in source/client.c [2465:2559]
static enum aws_mqtt_client_request_state s_unsubscribe_send(
uint16_t packet_id,
bool is_first_attempt,
void *userdata) {
(void)is_first_attempt;
struct unsubscribe_task_arg *task_arg = userdata;
struct aws_io_message *message = NULL;
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: Attempting send of unsubscribe %" PRIu16 " %s",
(void *)task_arg->connection,
packet_id,
is_first_attempt ? "first attempt" : "resend");
static const size_t num_topics = 1;
AWS_VARIABLE_LENGTH_ARRAY(uint8_t, transaction_buf, num_topics * aws_mqtt_topic_tree_action_size);
struct aws_array_list transaction;
aws_array_list_init_static(&transaction, transaction_buf, num_topics, aws_mqtt_topic_tree_action_size);
if (!task_arg->tree_updated) {
struct subscribe_task_topic *topic;
if (aws_mqtt_topic_tree_transaction_remove(
&task_arg->connection->thread_data.subscriptions, &transaction, &task_arg->filter, (void **)&topic)) {
goto handle_error;
}
task_arg->is_local = topic ? topic->is_local : false;
}
if (!task_arg->is_local) {
if (task_arg->unsubscribe.fixed_header.packet_type == 0) {
/* If unsubscribe packet is uninitialized, init it */
if (aws_mqtt_packet_unsubscribe_init(&task_arg->unsubscribe, task_arg->connection->allocator, packet_id)) {
goto handle_error;
}
if (aws_mqtt_packet_unsubscribe_add_topic(&task_arg->unsubscribe, task_arg->filter)) {
goto handle_error;
}
}
message = mqtt_get_message_for_packet(task_arg->connection, &task_arg->unsubscribe.fixed_header);
if (!message) {
goto handle_error;
}
if (aws_mqtt_packet_unsubscribe_encode(&message->message_data, &task_arg->unsubscribe)) {
goto handle_error;
}
if (aws_channel_slot_send_message(task_arg->connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
goto handle_error;
}
/* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion
* invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly
* fire fire the on_completion callbacks. */
struct request_timeout_task_arg *timeout_task_arg = s_schedule_timeout_task(task_arg->connection, packet_id);
if (!timeout_task_arg) {
return AWS_MQTT_CLIENT_REQUEST_ERROR;
}
/*
* Set up mutual references between the operation task args and the timeout task args. Whoever runs first
* "wins", does its logic, and then breaks the connection between the two.
*/
task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg;
timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper;
}
if (!task_arg->tree_updated) {
aws_mqtt_topic_tree_transaction_commit(&task_arg->connection->thread_data.subscriptions, &transaction);
task_arg->tree_updated = true;
}
aws_array_list_clean_up(&transaction);
/* If the subscribe is local-only, don't wait for a SUBACK to come back. */
return task_arg->is_local ? AWS_MQTT_CLIENT_REQUEST_COMPLETE : AWS_MQTT_CLIENT_REQUEST_ONGOING;
handle_error:
if (message) {
aws_mem_release(message->allocator, message);
}
if (!task_arg->tree_updated) {
aws_mqtt_topic_tree_transaction_roll_back(&task_arg->connection->thread_data.subscriptions, &transaction);
}
aws_array_list_clean_up(&transaction);
return AWS_MQTT_CLIENT_REQUEST_ERROR;
}