in source/client.c [2840:2908]
uint16_t aws_mqtt_client_connection_publish(
struct aws_mqtt_client_connection *connection,
const struct aws_byte_cursor *topic,
enum aws_mqtt_qos qos,
bool retain,
const struct aws_byte_cursor *payload,
aws_mqtt_op_complete_fn *on_complete,
void *userdata) {
AWS_PRECONDITION(connection);
if (!aws_mqtt_is_valid_topic(topic)) {
aws_raise_error(AWS_ERROR_MQTT_INVALID_TOPIC);
return 0;
}
struct publish_task_arg *arg = aws_mem_calloc(connection->allocator, 1, sizeof(struct publish_task_arg));
if (!arg) {
return 0;
}
arg->connection = connection;
arg->topic_string = aws_string_new_from_array(connection->allocator, topic->ptr, topic->len);
arg->topic = aws_byte_cursor_from_string(arg->topic_string);
arg->qos = qos;
arg->retain = retain;
if (aws_byte_buf_init_copy_from_cursor(&arg->payload_buf, connection->allocator, *payload)) {
goto handle_error;
}
arg->payload = aws_byte_cursor_from_buf(&arg->payload_buf);
arg->on_complete = on_complete;
arg->userdata = userdata;
bool retry = qos == AWS_MQTT_QOS_AT_MOST_ONCE;
uint16_t packet_id = mqtt_create_request(connection, &s_publish_send, arg, &s_publish_complete, arg, retry);
if (packet_id == 0) {
/* bummer, we failed to make a new request */
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT,
"id=%p: Failed starting publish to topic " PRInSTR ",error %d (%s)",
(void *)connection,
AWS_BYTE_CURSOR_PRI(*topic),
aws_last_error(),
aws_error_name(aws_last_error()));
goto handle_error;
}
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"id=%p: Starting publish %" PRIu16 " to topic " PRInSTR,
(void *)connection,
packet_id,
AWS_BYTE_CURSOR_PRI(*topic));
return packet_id;
handle_error:
/* we know arg is valid, topic_string may or may not be valid */
if (arg->topic_string) {
aws_string_destroy(arg->topic_string);
}
aws_byte_buf_clean_up(&arg->payload_buf);
aws_mem_release(connection->allocator, arg);
return 0;
}