in source/client.c [418:592]
static void s_mqtt_client_init(
struct aws_client_bootstrap *bootstrap,
int error_code,
struct aws_channel *channel,
void *user_data) {
(void)bootstrap;
struct aws_io_message *message = NULL;
/* Setup callback contract is: if error_code is non-zero then channel is NULL. */
AWS_FATAL_ASSERT((error_code != 0) == (channel == NULL));
struct aws_mqtt_client_connection *connection = user_data;
if (error_code != AWS_OP_SUCCESS) {
/* client shutdown already handles this case, so just call that. */
s_mqtt_client_shutdown(bootstrap, error_code, channel, user_data);
return;
}
/* user requested disconnect before the channel has been set up. Stop installing the slot and sending CONNECT. */
bool failed_create_slot = false;
{ /* BEGIN CRITICAL SECTION */
mqtt_connection_lock_synced_data(connection);
if (connection->synced_data.state == AWS_MQTT_CLIENT_STATE_DISCONNECTING) {
/* It only happens when the user request disconnect during reconnecting, we don't need to fire any callback.
* The on_disconnect will be invoked as channel finish shutting down. */
mqtt_connection_unlock_synced_data(connection);
aws_channel_shutdown(channel, AWS_ERROR_SUCCESS);
return;
}
/* Create the slot */
connection->slot = aws_channel_slot_new(channel);
if (!connection->slot) {
failed_create_slot = true;
}
mqtt_connection_unlock_synced_data(connection);
} /* END CRITICAL SECTION */
/* intall the slot and handler */
if (failed_create_slot) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT,
"id=%p: Failed to create new slot, something has gone horribly wrong, error %d (%s).",
(void *)connection,
aws_last_error(),
aws_error_name(aws_last_error()));
goto handle_error;
}
if (aws_channel_slot_insert_end(channel, connection->slot)) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT,
"id=%p: Failed to insert slot into channel %p, error %d (%s).",
(void *)connection,
(void *)channel,
aws_last_error(),
aws_error_name(aws_last_error()));
goto handle_error;
}
if (aws_channel_slot_set_handler(connection->slot, &connection->handler)) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT,
"id=%p: Failed to set MQTT handler into slot on channel %p, error %d (%s).",
(void *)connection,
(void *)channel,
aws_last_error(),
aws_error_name(aws_last_error()));
goto handle_error;
}
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT, "id=%p: Connection successfully opened, sending CONNECT packet", (void *)connection);
struct aws_channel_task *connack_task = aws_mem_calloc(connection->allocator, 1, sizeof(struct aws_channel_task));
if (!connack_task) {
AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to allocate timeout task.", (void *)connection);
goto handle_error;
}
aws_channel_task_init(connack_task, s_connack_received_timeout, connection, "mqtt_connack_timeout");
uint64_t now = 0;
if (aws_channel_current_clock_time(channel, &now)) {
AWS_LOGF_ERROR(
AWS_LS_MQTT_CLIENT,
"static: Failed to setting MQTT handler into slot on channel %p, error %d (%s).",
(void *)channel,
aws_last_error(),
aws_error_name(aws_last_error()));
goto handle_error;
}
now += connection->ping_timeout_ns;
aws_channel_schedule_task_future(channel, connack_task, now);
/* Send the connect packet */
struct aws_mqtt_packet_connect connect;
aws_mqtt_packet_connect_init(
&connect,
aws_byte_cursor_from_buf(&connection->client_id),
connection->clean_session,
connection->keep_alive_time_secs);
if (connection->will.topic.buffer) {
/* Add will if present */
struct aws_byte_cursor topic_cur = aws_byte_cursor_from_buf(&connection->will.topic);
struct aws_byte_cursor payload_cur = aws_byte_cursor_from_buf(&connection->will.payload);
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"id=%p: Adding will to connection on " PRInSTR " with payload " PRInSTR,
(void *)connection,
AWS_BYTE_CURSOR_PRI(topic_cur),
AWS_BYTE_CURSOR_PRI(payload_cur));
aws_mqtt_packet_connect_add_will(
&connect, topic_cur, connection->will.qos, connection->will.retain, payload_cur);
}
if (connection->username) {
struct aws_byte_cursor username_cur = aws_byte_cursor_from_string(connection->username);
AWS_LOGF_DEBUG(
AWS_LS_MQTT_CLIENT,
"id=%p: Adding username " PRInSTR " to connection",
(void *)connection,
AWS_BYTE_CURSOR_PRI(username_cur))
struct aws_byte_cursor password_cur = {
.ptr = NULL,
.len = 0,
};
if (connection->password) {
password_cur = aws_byte_cursor_from_string(connection->password);
}
aws_mqtt_packet_connect_add_credentials(&connect, username_cur, password_cur);
}
message = mqtt_get_message_for_packet(connection, &connect.fixed_header);
if (!message) {
AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to get message from pool", (void *)connection);
goto handle_error;
}
if (aws_mqtt_packet_connect_encode(&message->message_data, &connect)) {
AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to encode CONNECT packet", (void *)connection);
goto handle_error;
}
if (aws_channel_slot_send_message(connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to send encoded CONNECT packet upstream", (void *)connection);
goto handle_error;
}
return;
handle_error:
MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_connection_complete, aws_last_error(), 0, false);
aws_channel_shutdown(channel, aws_last_error());
if (message) {
aws_mem_release(message->allocator, message);
}
}