in source/client.c [2718:2809]
static enum aws_mqtt_client_request_state s_publish_send(uint16_t packet_id, bool is_first_attempt, void *userdata) {
struct publish_task_arg *task_arg = userdata;
struct aws_mqtt_client_connection *connection = task_arg->connection;
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: Attempting send of publish %" PRIu16 " %s",
(void *)task_arg->connection,
packet_id,
is_first_attempt ? "first attempt" : "resend");
bool is_qos_0 = task_arg->qos == AWS_MQTT_QOS_AT_MOST_ONCE;
if (is_qos_0) {
packet_id = 0;
}
if (is_first_attempt) {
if (aws_mqtt_packet_publish_init(
&task_arg->publish,
task_arg->retain,
task_arg->qos,
!is_first_attempt,
task_arg->topic,
packet_id,
task_arg->payload)) {
return AWS_MQTT_CLIENT_REQUEST_ERROR;
}
}
struct aws_io_message *message = mqtt_get_message_for_packet(task_arg->connection, &task_arg->publish.fixed_header);
if (!message) {
return AWS_MQTT_CLIENT_REQUEST_ERROR;
}
/* Encode the headers, and everything but the payload */
if (aws_mqtt_packet_publish_encode_headers(&message->message_data, &task_arg->publish)) {
return AWS_MQTT_CLIENT_REQUEST_ERROR;
}
struct aws_byte_cursor payload_cur = task_arg->payload;
{
write_payload_chunk:
(void)NULL;
const size_t left_in_message = message->message_data.capacity - message->message_data.len;
const size_t to_write = payload_cur.len < left_in_message ? payload_cur.len : left_in_message;
if (to_write) {
/* Write this chunk */
struct aws_byte_cursor to_write_cur = aws_byte_cursor_advance(&payload_cur, to_write);
AWS_ASSERT(to_write_cur.ptr); /* to_write is guaranteed to be inside the bounds of payload_cur */
if (!aws_byte_buf_write_from_whole_cursor(&message->message_data, to_write_cur)) {
aws_mem_release(message->allocator, message);
return AWS_MQTT_CLIENT_REQUEST_ERROR;
}
}
if (aws_channel_slot_send_message(task_arg->connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
aws_mem_release(message->allocator, message);
/* If it's QoS 0, telling user that the message haven't been sent, else, the message will be resent once the
* connection is back */
return is_qos_0 ? AWS_MQTT_CLIENT_REQUEST_ERROR : AWS_MQTT_CLIENT_REQUEST_ONGOING;
}
/* If there's still payload left, get a new message and start again. */
if (payload_cur.len) {
message = mqtt_get_message_for_packet(task_arg->connection, &task_arg->publish.fixed_header);
goto write_payload_chunk;
}
}
if (!is_qos_0 && connection->operation_timeout_ns != UINT64_MAX) {
/* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion
* invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly
* fire fire the on_completion callbacks. */
struct request_timeout_task_arg *timeout_task_arg = s_schedule_timeout_task(connection, packet_id);
if (!timeout_task_arg) {
return AWS_MQTT_CLIENT_REQUEST_ERROR;
}
/*
* Set up mutual references between the operation task args and the timeout task args. Whoever runs first
* "wins", does its logic, and then breaks the connection between the two.
*/
task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg;
timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper;
}
/* If QoS == 0, there will be no ack, so consider the request done now. */
return is_qos_0 ? AWS_MQTT_CLIENT_REQUEST_COMPLETE : AWS_MQTT_CLIENT_REQUEST_ONGOING;
}