in source/client.c [1991:2088]
uint16_t aws_mqtt_client_connection_subscribe(
struct aws_mqtt_client_connection *connection,
const struct aws_byte_cursor *topic_filter,
enum aws_mqtt_qos qos,
aws_mqtt_client_publish_received_fn *on_publish,
void *on_publish_ud,
aws_mqtt_userdata_cleanup_fn *on_ud_cleanup,
aws_mqtt_suback_fn *on_suback,
void *on_suback_ud) {
AWS_PRECONDITION(connection);
if (!aws_mqtt_is_valid_topic_filter(topic_filter)) {
aws_raise_error(AWS_ERROR_MQTT_INVALID_TOPIC);
return 0;
}
/* Because we know we're only going to have 1 topic, we can cheat and allocate the array_list in the same block as
* the task argument. */
void *task_topic_storage = NULL;
struct subscribe_task_topic *task_topic = NULL;
struct subscribe_task_arg *task_arg = aws_mem_acquire_many(
connection->allocator,
2,
&task_arg,
sizeof(struct subscribe_task_arg),
&task_topic_storage,
sizeof(struct subscribe_task_topic *));
if (!task_arg) {
goto handle_error;
}
AWS_ZERO_STRUCT(*task_arg);
task_arg->connection = connection;
task_arg->on_suback.single = on_suback;
task_arg->on_suback_ud = on_suback_ud;
/* It stores the pointer */
aws_array_list_init_static(&task_arg->topics, task_topic_storage, 1, sizeof(void *));
/* Allocate the topic and push into the list */
task_topic = aws_mem_calloc(connection->allocator, 1, sizeof(struct subscribe_task_topic));
if (!task_topic) {
goto handle_error;
}
aws_ref_count_init(&task_topic->ref_count, task_topic, (aws_simple_completion_callback *)s_task_topic_clean_up);
aws_array_list_push_back(&task_arg->topics, &task_topic);
task_topic->filter = aws_string_new_from_array(connection->allocator, topic_filter->ptr, topic_filter->len);
if (!task_topic->filter) {
goto handle_error;
}
task_topic->connection = connection;
task_topic->request.topic = aws_byte_cursor_from_string(task_topic->filter);
task_topic->request.qos = qos;
task_topic->request.on_publish = on_publish;
task_topic->request.on_cleanup = on_ud_cleanup;
task_topic->request.on_publish_ud = on_publish_ud;
uint16_t packet_id = mqtt_create_request(
task_arg->connection, &s_subscribe_send, task_arg, &s_subscribe_single_complete, task_arg, false /* noRetry */);
if (packet_id == 0) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT,
"id=%p: Failed to start subscribe on topic " PRInSTR " with error %s",
(void *)connection,
AWS_BYTE_CURSOR_PRI(task_topic->request.topic),
aws_error_debug_str(aws_last_error()));
goto handle_error;
}
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"id=%p: Starting subscribe %" PRIu16 " on topic " PRInSTR,
(void *)connection,
packet_id,
AWS_BYTE_CURSOR_PRI(task_topic->request.topic));
return packet_id;
handle_error:
if (task_topic) {
if (task_topic->filter) {
aws_string_destroy(task_topic->filter);
}
aws_mem_release(connection->allocator, task_topic);
}
if (task_arg) {
aws_mem_release(connection->allocator, task_arg);
}
return 0;
}