in src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java [491:603]
public void forwardOrExecuteADTask(
ADTask adTask,
TransportService transportService,
ActionListener<ADBatchAnomalyResultResponse> listener
) {
try {
checkIfADTaskCancelledAndCleanupCache(adTask);
String detectorId = adTask.getDetectorId();
AnomalyDetector detector = adTask.getDetector();
boolean isHCDetector = detector.isMultientityDetector();
if (isHCDetector) {
String entityString = adTaskCacheManager.pollEntity(detectorId);
logger.debug("Start to run entity: {} of detector {}", entityString, detectorId);
if (entityString == null) {
listener.onResponse(new ADBatchAnomalyResultResponse(clusterService.localNode().getId(), false));
return;
}
ActionListener<Object> wrappedListener = ActionListener.wrap(r -> logger.debug("Entity task created successfully"), e -> {
logger.error("Failed to start entity task for detector: {}, entity: {}", detectorId, entityString);
// If fail, move the entity into pending task queue
adTaskCacheManager.addPendingEntity(detectorId, entityString);
});
// This is to handle retry case. To retry entity, we need to get the old entity task created before.
Entity entity = adTaskManager.parseEntityFromString(entityString, adTask);
String parentTaskId = adTask.getTaskType().equals(ADTaskType.HISTORICAL_HC_ENTITY.name())
? adTask.getParentTaskId() // For HISTORICAL_HC_ENTITY task, return its parent task id
: adTask.getTaskId(); // For HISTORICAL_HC_DETECTOR task, its task id is parent task id
adTaskManager
.getAndExecuteOnLatestADTask(
detectorId,
parentTaskId,
entity,
ImmutableList.of(ADTaskType.HISTORICAL_HC_ENTITY),
existingEntityTask -> {
if (existingEntityTask.isPresent()) { // retry failed entity caused by limit exceed exception
// TODO: if task failed due to limit exceed exception in half way, resume from the break point or just clear
// the
// old AD tasks and rerun it? Currently we just support rerunning task failed due to limit exceed exception
// before starting.
ADTask adEntityTask = existingEntityTask.get();
logger
.debug(
"Rerun entity task for task id: {}, error of last run: {}",
adEntityTask.getTaskId(),
adEntityTask.getError()
);
ActionListener<ADBatchAnomalyResultResponse> workerNodeResponseListener = workerNodeResponseListener(
adEntityTask,
transportService,
listener
);
forwardOrExecuteEntityTask(adEntityTask, transportService, workerNodeResponseListener);
} else {
logger.info("Create entity task for entity:{}", entityString);
Instant now = Instant.now();
ADTask adEntityTask = new ADTask.Builder()
.detectorId(adTask.getDetectorId())
.detector(detector)
.isLatest(true)
.taskType(ADTaskType.HISTORICAL_HC_ENTITY.name())
.executionStartTime(now)
.taskProgress(0.0f)
.initProgress(0.0f)
.state(ADTaskState.INIT.name())
.initProgress(0.0f)
.lastUpdateTime(now)
.startedBy(adTask.getStartedBy())
.coordinatingNode(clusterService.localNode().getId())
.detectionDateRange(adTask.getDetectionDateRange())
.user(adTask.getUser())
.entity(entity)
.parentTaskId(parentTaskId)
.build();
adTaskManager.createADTaskDirectly(adEntityTask, r -> {
adEntityTask.setTaskId(r.getId());
ActionListener<ADBatchAnomalyResultResponse> workerNodeResponseListener = workerNodeResponseListener(
adEntityTask,
transportService,
listener
);
forwardOrExecuteEntityTask(adEntityTask, transportService, workerNodeResponseListener);
}, wrappedListener);
}
},
transportService,
false,
wrappedListener
);
} else {
Map<String, Object> updatedFields = new HashMap<>();
updatedFields.put(STATE_FIELD, ADTaskState.INIT.name());
updatedFields.put(INIT_PROGRESS_FIELD, 0.0f);
ActionListener<ADBatchAnomalyResultResponse> workerNodeResponseListener = workerNodeResponseListener(
adTask,
transportService,
listener
);
adTaskManager
.updateADTask(
adTask.getTaskId(),
updatedFields,
ActionListener
.wrap(
r -> forwardOrExecuteEntityTask(adTask, transportService, workerNodeResponseListener),
e -> { workerNodeResponseListener.onFailure(e); }
)
);
}
} catch (Exception e) {
logger.error("Failed to forward or execute AD task " + adTask.getTaskId(), e);
listener.onFailure(e);
}
}