in source/client_channel_handler.c [175:245]
static int s_packet_handler_publish(
struct aws_mqtt_client_connection *connection,
struct aws_byte_cursor message_cursor) {
/* TODO: need to handle the QoS 2 message to avoid processing the message a second time */
struct aws_mqtt_packet_publish publish;
if (aws_mqtt_packet_publish_decode(&message_cursor, &publish)) {
return AWS_OP_ERR;
}
aws_mqtt_topic_tree_publish(&connection->thread_data.subscriptions, &publish);
bool dup = aws_mqtt_packet_publish_get_dup(&publish);
enum aws_mqtt_qos qos = aws_mqtt_packet_publish_get_qos(&publish);
bool retain = aws_mqtt_packet_publish_get_retain(&publish);
MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_any_publish, &publish.topic_name, &publish.payload, dup, qos, retain);
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT,
"id=%p: publish received with msg id=%" PRIu16 " dup=%d qos=%d retain=%d payload-size=%zu topic=" PRInSTR,
(void *)connection,
publish.packet_identifier,
dup,
qos,
retain,
publish.payload.len,
AWS_BYTE_CURSOR_PRI(publish.topic_name));
struct aws_mqtt_packet_ack puback;
AWS_ZERO_STRUCT(puback);
/* Switch on QoS flags (bits 1 & 2) */
switch (qos) {
case AWS_MQTT_QOS_AT_MOST_ONCE:
AWS_LOGF_TRACE(
AWS_LS_MQTT_CLIENT, "id=%p: received publish QOS is 0, not sending puback", (void *)connection);
/* No more communication necessary */
break;
case AWS_MQTT_QOS_AT_LEAST_ONCE:
AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: received publish QOS is 1, sending puback", (void *)connection);
aws_mqtt_packet_puback_init(&puback, publish.packet_identifier);
break;
case AWS_MQTT_QOS_EXACTLY_ONCE:
AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: received publish QOS is 2, sending pubrec", (void *)connection);
aws_mqtt_packet_pubrec_init(&puback, publish.packet_identifier);
break;
default:
/* Impossible to hit this branch. QoS value is checked when decoding */
AWS_FATAL_ASSERT(0);
break;
}
if (puback.packet_identifier) {
struct aws_io_message *message = mqtt_get_message_for_packet(connection, &puback.fixed_header);
if (!message) {
return AWS_OP_ERR;
}
if (aws_mqtt_packet_ack_encode(&message->message_data, &puback)) {
aws_mem_release(message->allocator, message);
return AWS_OP_ERR;
}
if (aws_channel_slot_send_message(connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
aws_mem_release(message->allocator, message);
return AWS_OP_ERR;
}
}
return AWS_OP_SUCCESS;
}