in source/client.c [1852:1955]
uint16_t aws_mqtt_client_connection_subscribe_multiple(
struct aws_mqtt_client_connection *connection,
const struct aws_array_list *topic_filters,
aws_mqtt_suback_multi_fn *on_suback,
void *on_suback_ud) {
AWS_PRECONDITION(connection);
struct subscribe_task_arg *task_arg = aws_mem_calloc(connection->allocator, 1, sizeof(struct subscribe_task_arg));
if (!task_arg) {
return 0;
}
task_arg->connection = connection;
task_arg->on_suback.multi = on_suback;
task_arg->on_suback_ud = on_suback_ud;
const size_t num_topics = aws_array_list_length(topic_filters);
if (aws_array_list_init_dynamic(&task_arg->topics, connection->allocator, num_topics, sizeof(void *))) {
goto handle_error;
}
AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Starting multi-topic subscribe", (void *)connection);
for (size_t i = 0; i < num_topics; ++i) {
struct aws_mqtt_topic_subscription *request = NULL;
aws_array_list_get_at_ptr(topic_filters, (void **)&request, i);
if (!aws_mqtt_is_valid_topic_filter(&request->topic)) {
aws_raise_error(AWS_ERROR_MQTT_INVALID_TOPIC);
goto handle_error;
}
struct subscribe_task_topic *task_topic =
aws_mem_calloc(connection->allocator, 1, sizeof(struct subscribe_task_topic));
if (!task_topic) {
goto handle_error;
}
aws_ref_count_init(&task_topic->ref_count, task_topic, (aws_simple_completion_callback *)s_task_topic_clean_up);
task_topic->connection = connection;
task_topic->request = *request;
task_topic->filter = aws_string_new_from_array(
connection->allocator, task_topic->request.topic.ptr, task_topic->request.topic.len);
if (!task_topic->filter) {
aws_mem_release(connection->allocator, task_topic);
goto handle_error;
}
/* Update request topic cursor to refer to owned string */
task_topic->request.topic = aws_byte_cursor_from_string(task_topic->filter);
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"id=%p: Adding topic \"" PRInSTR "\"",
(void *)connection,
AWS_BYTE_CURSOR_PRI(task_topic->request.topic));
/* Push into the list */
aws_array_list_push_back(&task_arg->topics, &task_topic);
}
uint16_t packet_id = mqtt_create_request(
task_arg->connection, &s_subscribe_send, task_arg, &s_subscribe_complete, task_arg, false /* noRetry */);
if (packet_id == 0) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT,
"id=%p: Failed to kick off multi-topic subscribe, with error %s",
(void *)connection,
aws_error_debug_str(aws_last_error()));
goto handle_error;
}
AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Sending multi-topic subscribe %" PRIu16, (void *)connection, packet_id);
return packet_id;
handle_error:
if (task_arg) {
if (task_arg->topics.data) {
const size_t num_added_topics = aws_array_list_length(&task_arg->topics);
for (size_t i = 0; i < num_added_topics; ++i) {
struct subscribe_task_topic *task_topic = NULL;
aws_array_list_get_at(&task_arg->topics, (void **)&task_topic, i);
AWS_ASSUME(task_topic);
aws_string_destroy(task_topic->filter);
aws_mem_release(connection->allocator, task_topic);
}
aws_array_list_clean_up(&task_arg->topics);
}
aws_mem_release(connection->allocator, task_arg);
}
return 0;
}