in source/client.c [2273:2351]
static enum aws_mqtt_client_request_state s_resubscribe_send(
uint16_t packet_id,
bool is_first_attempt,
void *userdata) {
struct subscribe_task_arg *task_arg = userdata;
bool initing_packet = task_arg->subscribe.fixed_header.packet_type == 0;
struct aws_io_message *message = NULL;
size_t sub_count = aws_mqtt_topic_tree_get_sub_count(&task_arg->connection->thread_data.subscriptions);
if (sub_count == 0) {
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: Not subscribed to any topics. Resubscribe is unnecessary, no packet will be sent.",
(void *)task_arg->connection);
return AWS_MQTT_CLIENT_REQUEST_COMPLETE;
}
if (aws_array_list_init_dynamic(&task_arg->topics, task_arg->connection->allocator, sub_count, sizeof(void *))) {
goto handle_error;
}
aws_mqtt_topic_tree_iterate(&task_arg->connection->thread_data.subscriptions, s_reconnect_resub_iterator, task_arg);
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: Attempting send of resubscribe %" PRIu16 " (%s)",
(void *)task_arg->connection,
packet_id,
is_first_attempt ? "first attempt" : "resend");
if (initing_packet) {
/* Init the subscribe packet */
if (aws_mqtt_packet_subscribe_init(&task_arg->subscribe, task_arg->connection->allocator, packet_id)) {
return AWS_MQTT_CLIENT_REQUEST_ERROR;
}
const size_t num_topics = aws_array_list_length(&task_arg->topics);
if (num_topics <= 0) {
aws_raise_error(AWS_ERROR_MQTT_INVALID_TOPIC);
return AWS_MQTT_CLIENT_REQUEST_ERROR;
}
for (size_t i = 0; i < num_topics; ++i) {
struct subscribe_task_topic *topic = NULL;
aws_array_list_get_at(&task_arg->topics, &topic, i);
AWS_ASSUME(topic); /* We know we're within bounds */
if (aws_mqtt_packet_subscribe_add_topic(&task_arg->subscribe, topic->request.topic, topic->request.qos)) {
goto handle_error;
}
}
}
message = mqtt_get_message_for_packet(task_arg->connection, &task_arg->subscribe.fixed_header);
if (!message) {
goto handle_error;
}
if (aws_mqtt_packet_subscribe_encode(&message->message_data, &task_arg->subscribe)) {
goto handle_error;
}
/* This is not necessarily a fatal error; if the send fails, it'll just retry. Still need to clean up though. */
if (aws_channel_slot_send_message(task_arg->connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
aws_mem_release(message->allocator, message);
}
return AWS_MQTT_CLIENT_REQUEST_ONGOING;
handle_error:
if (message) {
aws_mem_release(message->allocator, message);
}
return AWS_MQTT_CLIENT_REQUEST_ERROR;
}