in source/device_defender.c [1039:1175]
static int s_defender_task_create(
struct aws_iotdevice_defender_task **task_out,
const struct aws_iotdevice_defender_task_config *config,
aws_iotdevice_defender_publish_fn *publish_fn,
struct aws_mqtt_client_connection *connection,
struct aws_event_loop *event_loop) {
AWS_PRECONDITION(task_out != NULL);
AWS_PRECONDITION(config != NULL);
AWS_PRECONDITION(publish_fn != NULL);
AWS_PRECONDITION(event_loop != NULL);
int return_code = AWS_OP_ERR;
struct aws_allocator *allocator = config->allocator;
struct aws_iotdevice_defender_task *defender_task = NULL;
if (config->report_format != AWS_IDDRF_JSON) {
AWS_LOGF_ERROR(AWS_LS_IOTDEVICE_DEFENDER_TASK, "Unsupported DeviceDefender detect report format detected.");
aws_raise_error(AWS_ERROR_IOTDEVICE_DEFENDER_UNSUPPORTED_REPORT_FORMAT);
goto cleanup;
}
defender_task =
(struct aws_iotdevice_defender_task *)aws_mem_calloc(allocator, 1, sizeof(struct aws_iotdevice_defender_task));
if (defender_task == NULL) {
aws_raise_error(aws_last_error());
goto cleanup;
}
defender_task->allocator = allocator;
defender_task->event_loop = event_loop;
defender_task->publish_fn = publish_fn;
defender_task->connection = connection;
defender_task->previous_net_xfer.bytes_in = 0;
defender_task->previous_net_xfer.bytes_out = 0;
defender_task->previous_net_xfer.packets_in = 0;
defender_task->previous_net_xfer.packets_out = 0;
defender_task->has_previous_net_xfer = false;
defender_task->is_task_canceled = false;
aws_ref_count_init(&defender_task->ref_count, defender_task, s_delete_dd_task);
if (AWS_OP_SUCCESS != aws_mutex_init(&defender_task->task_cancel_mutex)) {
goto cleanup;
}
if (AWS_OP_SUCCESS != aws_condition_variable_init(&defender_task->cv_task_canceled)) {
goto cleanup;
}
if (AWS_OP_SUCCESS != s_copy_task_config(&defender_task->config, config)) {
goto cleanup;
}
if (!aws_array_list_is_valid(&defender_task->config.custom_metrics)) {
aws_raise_error(AWS_ERROR_IOTDEVICE_DEFENDER_INVALID_TASK_CONFIG);
goto cleanup;
}
if (connection != NULL) {
struct aws_byte_cursor prefix = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("$aws/things/");
struct aws_byte_cursor publish_suffix = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("/defender/metrics/json");
struct aws_byte_cursor accepted_suffix =
AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("/defender/metrics/json/accepted");
struct aws_byte_cursor rejected_suffix =
AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("/defender/metrics/json/rejected");
defender_task->publish_report_topic_name = s_build_topic(allocator, config->thing_name, prefix, publish_suffix);
defender_task->report_accepted_topic_name =
s_build_topic(allocator, config->thing_name, prefix, accepted_suffix);
defender_task->report_rejected_topic_name =
s_build_topic(allocator, config->thing_name, prefix, rejected_suffix);
const struct aws_byte_cursor accepted_cursor =
aws_byte_cursor_from_string(defender_task->report_accepted_topic_name);
uint16_t sub_accepted_packet_id = aws_mqtt_client_connection_subscribe(
defender_task->connection,
&accepted_cursor,
AWS_MQTT_QOS_AT_LEAST_ONCE,
&s_on_report_response_accepted,
defender_task,
NULL,
s_mqtt_on_suback,
defender_task);
if (sub_accepted_packet_id != 0) {
AWS_LOGF_DEBUG(
AWS_LS_IOTDEVICE_DEFENDER_TASK,
"id=%p: subscription packet_id [%d] for accepted topic %s",
(void *)defender_task,
sub_accepted_packet_id,
aws_string_c_str(defender_task->report_accepted_topic_name));
} else {
AWS_LOGF_ERROR(
AWS_LS_IOTDEVICE_DEFENDER_TASK,
"id=%p: Failed to send subscription packet for topic: %s",
(void *)defender_task,
aws_string_c_str(defender_task->report_accepted_topic_name));
}
const struct aws_byte_cursor rejected_cursor =
aws_byte_cursor_from_string(defender_task->report_rejected_topic_name);
uint16_t sub_rejected_packet_id = aws_mqtt_client_connection_subscribe(
defender_task->connection,
&rejected_cursor,
AWS_MQTT_QOS_AT_LEAST_ONCE,
&s_on_report_response_rejected,
defender_task,
NULL,
s_mqtt_on_suback,
defender_task);
if (sub_accepted_packet_id != 0) {
AWS_LOGF_TRACE(
AWS_LS_IOTDEVICE_DEFENDER_TASK,
"id=%p: subscription packet_id [%d] for rejected topic %s",
(void *)defender_task,
sub_rejected_packet_id,
aws_string_c_str(defender_task->report_rejected_topic_name));
} else {
AWS_LOGF_ERROR(
AWS_LS_IOTDEVICE_DEFENDER_TASK,
"id=%p: Failed to send subscription packet for rejected topic: %s",
(void *)defender_task,
aws_string_c_str(defender_task->report_rejected_topic_name));
}
}
aws_task_init(&defender_task->task, s_reporting_task_fn, defender_task, "DeviceDefenderReportTask");
*task_out = defender_task;
AWS_LOGF_TRACE(
AWS_LS_IOTDEVICE_DEFENDER_TASK, "id=%p: Scheduling defender task for first run", (void *)defender_task);
aws_event_loop_schedule_task_now(event_loop, &defender_task->task);
return_code = AWS_OP_SUCCESS;
cleanup:
if (return_code == AWS_OP_ERR && defender_task != NULL) {
s_defender_task_clean_up(defender_task);
}
return return_code;
}