public void forwardOrExecuteADTask()

in src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java [491:603]


    public void forwardOrExecuteADTask(
        ADTask adTask,
        TransportService transportService,
        ActionListener<ADBatchAnomalyResultResponse> listener
    ) {
        try {
            checkIfADTaskCancelledAndCleanupCache(adTask);
            String detectorId = adTask.getDetectorId();
            AnomalyDetector detector = adTask.getDetector();
            boolean isHCDetector = detector.isMultientityDetector();
            if (isHCDetector) {
                String entityString = adTaskCacheManager.pollEntity(detectorId);
                logger.debug("Start to run entity: {} of detector {}", entityString, detectorId);
                if (entityString == null) {
                    listener.onResponse(new ADBatchAnomalyResultResponse(clusterService.localNode().getId(), false));
                    return;
                }
                ActionListener<Object> wrappedListener = ActionListener.wrap(r -> logger.debug("Entity task created successfully"), e -> {
                    logger.error("Failed to start entity task for detector: {}, entity: {}", detectorId, entityString);
                    // If fail, move the entity into pending task queue
                    adTaskCacheManager.addPendingEntity(detectorId, entityString);
                });
                // This is to handle retry case. To retry entity, we need to get the old entity task created before.
                Entity entity = adTaskManager.parseEntityFromString(entityString, adTask);
                String parentTaskId = adTask.getTaskType().equals(ADTaskType.HISTORICAL_HC_ENTITY.name())
                    ? adTask.getParentTaskId() // For HISTORICAL_HC_ENTITY task, return its parent task id
                    : adTask.getTaskId(); // For HISTORICAL_HC_DETECTOR task, its task id is parent task id
                adTaskManager
                    .getAndExecuteOnLatestADTask(
                        detectorId,
                        parentTaskId,
                        entity,
                        ImmutableList.of(ADTaskType.HISTORICAL_HC_ENTITY),
                        existingEntityTask -> {
                            if (existingEntityTask.isPresent()) { // retry failed entity caused by limit exceed exception
                                // TODO: if task failed due to limit exceed exception in half way, resume from the break point or just clear
                                // the
                                // old AD tasks and rerun it? Currently we just support rerunning task failed due to limit exceed exception
                                // before starting.
                                ADTask adEntityTask = existingEntityTask.get();
                                logger
                                    .debug(
                                        "Rerun entity task for task id: {}, error of last run: {}",
                                        adEntityTask.getTaskId(),
                                        adEntityTask.getError()
                                    );
                                ActionListener<ADBatchAnomalyResultResponse> workerNodeResponseListener = workerNodeResponseListener(
                                    adEntityTask,
                                    transportService,
                                    listener
                                );
                                forwardOrExecuteEntityTask(adEntityTask, transportService, workerNodeResponseListener);
                            } else {
                                logger.info("Create entity task for entity:{}", entityString);
                                Instant now = Instant.now();
                                ADTask adEntityTask = new ADTask.Builder()
                                    .detectorId(adTask.getDetectorId())
                                    .detector(detector)
                                    .isLatest(true)
                                    .taskType(ADTaskType.HISTORICAL_HC_ENTITY.name())
                                    .executionStartTime(now)
                                    .taskProgress(0.0f)
                                    .initProgress(0.0f)
                                    .state(ADTaskState.INIT.name())
                                    .initProgress(0.0f)
                                    .lastUpdateTime(now)
                                    .startedBy(adTask.getStartedBy())
                                    .coordinatingNode(clusterService.localNode().getId())
                                    .detectionDateRange(adTask.getDetectionDateRange())
                                    .user(adTask.getUser())
                                    .entity(entity)
                                    .parentTaskId(parentTaskId)
                                    .build();
                                adTaskManager.createADTaskDirectly(adEntityTask, r -> {
                                    adEntityTask.setTaskId(r.getId());
                                    ActionListener<ADBatchAnomalyResultResponse> workerNodeResponseListener = workerNodeResponseListener(
                                        adEntityTask,
                                        transportService,
                                        listener
                                    );
                                    forwardOrExecuteEntityTask(adEntityTask, transportService, workerNodeResponseListener);
                                }, wrappedListener);
                            }
                        },
                        transportService,
                        false,
                        wrappedListener
                    );
            } else {
                Map<String, Object> updatedFields = new HashMap<>();
                updatedFields.put(STATE_FIELD, ADTaskState.INIT.name());
                updatedFields.put(INIT_PROGRESS_FIELD, 0.0f);
                ActionListener<ADBatchAnomalyResultResponse> workerNodeResponseListener = workerNodeResponseListener(
                    adTask,
                    transportService,
                    listener
                );
                adTaskManager
                    .updateADTask(
                        adTask.getTaskId(),
                        updatedFields,
                        ActionListener
                            .wrap(
                                r -> forwardOrExecuteEntityTask(adTask, transportService, workerNodeResponseListener),
                                e -> { workerNodeResponseListener.onFailure(e); }
                            )
                    );
            }
        } catch (Exception e) {
            logger.error("Failed to forward or execute AD task " + adTask.getTaskId(), e);
            listener.onFailure(e);
        }
    }