static int s_defender_task_create()

in source/device_defender.c [1039:1175]


static int s_defender_task_create(
    struct aws_iotdevice_defender_task **task_out,
    const struct aws_iotdevice_defender_task_config *config,
    aws_iotdevice_defender_publish_fn *publish_fn,
    struct aws_mqtt_client_connection *connection,
    struct aws_event_loop *event_loop) {
    AWS_PRECONDITION(task_out != NULL);
    AWS_PRECONDITION(config != NULL);
    AWS_PRECONDITION(publish_fn != NULL);
    AWS_PRECONDITION(event_loop != NULL);

    int return_code = AWS_OP_ERR;
    struct aws_allocator *allocator = config->allocator;
    struct aws_iotdevice_defender_task *defender_task = NULL;

    if (config->report_format != AWS_IDDRF_JSON) {
        AWS_LOGF_ERROR(AWS_LS_IOTDEVICE_DEFENDER_TASK, "Unsupported DeviceDefender detect report format detected.");
        aws_raise_error(AWS_ERROR_IOTDEVICE_DEFENDER_UNSUPPORTED_REPORT_FORMAT);
        goto cleanup;
    }

    defender_task =
        (struct aws_iotdevice_defender_task *)aws_mem_calloc(allocator, 1, sizeof(struct aws_iotdevice_defender_task));
    if (defender_task == NULL) {
        aws_raise_error(aws_last_error());
        goto cleanup;
    }

    defender_task->allocator = allocator;
    defender_task->event_loop = event_loop;
    defender_task->publish_fn = publish_fn;
    defender_task->connection = connection;
    defender_task->previous_net_xfer.bytes_in = 0;
    defender_task->previous_net_xfer.bytes_out = 0;
    defender_task->previous_net_xfer.packets_in = 0;
    defender_task->previous_net_xfer.packets_out = 0;
    defender_task->has_previous_net_xfer = false;
    defender_task->is_task_canceled = false;
    aws_ref_count_init(&defender_task->ref_count, defender_task, s_delete_dd_task);

    if (AWS_OP_SUCCESS != aws_mutex_init(&defender_task->task_cancel_mutex)) {
        goto cleanup;
    }
    if (AWS_OP_SUCCESS != aws_condition_variable_init(&defender_task->cv_task_canceled)) {
        goto cleanup;
    }

    if (AWS_OP_SUCCESS != s_copy_task_config(&defender_task->config, config)) {
        goto cleanup;
    }
    if (!aws_array_list_is_valid(&defender_task->config.custom_metrics)) {
        aws_raise_error(AWS_ERROR_IOTDEVICE_DEFENDER_INVALID_TASK_CONFIG);
        goto cleanup;
    }

    if (connection != NULL) {
        struct aws_byte_cursor prefix = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("$aws/things/");
        struct aws_byte_cursor publish_suffix = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("/defender/metrics/json");
        struct aws_byte_cursor accepted_suffix =
            AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("/defender/metrics/json/accepted");
        struct aws_byte_cursor rejected_suffix =
            AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("/defender/metrics/json/rejected");
        defender_task->publish_report_topic_name = s_build_topic(allocator, config->thing_name, prefix, publish_suffix);
        defender_task->report_accepted_topic_name =
            s_build_topic(allocator, config->thing_name, prefix, accepted_suffix);
        defender_task->report_rejected_topic_name =
            s_build_topic(allocator, config->thing_name, prefix, rejected_suffix);

        const struct aws_byte_cursor accepted_cursor =
            aws_byte_cursor_from_string(defender_task->report_accepted_topic_name);
        uint16_t sub_accepted_packet_id = aws_mqtt_client_connection_subscribe(
            defender_task->connection,
            &accepted_cursor,
            AWS_MQTT_QOS_AT_LEAST_ONCE,
            &s_on_report_response_accepted,
            defender_task,
            NULL,
            s_mqtt_on_suback,
            defender_task);
        if (sub_accepted_packet_id != 0) {
            AWS_LOGF_DEBUG(
                AWS_LS_IOTDEVICE_DEFENDER_TASK,
                "id=%p: subscription packet_id [%d] for accepted topic %s",
                (void *)defender_task,
                sub_accepted_packet_id,
                aws_string_c_str(defender_task->report_accepted_topic_name));
        } else {
            AWS_LOGF_ERROR(
                AWS_LS_IOTDEVICE_DEFENDER_TASK,
                "id=%p: Failed to send subscription packet for topic: %s",
                (void *)defender_task,
                aws_string_c_str(defender_task->report_accepted_topic_name));
        }

        const struct aws_byte_cursor rejected_cursor =
            aws_byte_cursor_from_string(defender_task->report_rejected_topic_name);
        uint16_t sub_rejected_packet_id = aws_mqtt_client_connection_subscribe(
            defender_task->connection,
            &rejected_cursor,
            AWS_MQTT_QOS_AT_LEAST_ONCE,
            &s_on_report_response_rejected,
            defender_task,
            NULL,
            s_mqtt_on_suback,
            defender_task);

        if (sub_accepted_packet_id != 0) {
            AWS_LOGF_TRACE(
                AWS_LS_IOTDEVICE_DEFENDER_TASK,
                "id=%p: subscription packet_id [%d] for rejected topic %s",
                (void *)defender_task,
                sub_rejected_packet_id,
                aws_string_c_str(defender_task->report_rejected_topic_name));
        } else {
            AWS_LOGF_ERROR(
                AWS_LS_IOTDEVICE_DEFENDER_TASK,
                "id=%p: Failed to send subscription packet for rejected topic: %s",
                (void *)defender_task,
                aws_string_c_str(defender_task->report_rejected_topic_name));
        }
    }

    aws_task_init(&defender_task->task, s_reporting_task_fn, defender_task, "DeviceDefenderReportTask");
    *task_out = defender_task;

    AWS_LOGF_TRACE(
        AWS_LS_IOTDEVICE_DEFENDER_TASK, "id=%p: Scheduling defender task for first run", (void *)defender_task);
    aws_event_loop_schedule_task_now(event_loop, &defender_task->task);

    return_code = AWS_OP_SUCCESS;
cleanup:
    if (return_code == AWS_OP_ERR && defender_task != NULL) {
        s_defender_task_clean_up(defender_task);
    }

    return return_code;
}