in src/main/java/org/opensearch/ad/transport/ForwardADTaskTransportAction.java [78:259]
protected void doExecute(Task task, ForwardADTaskRequest request, ActionListener<AnomalyDetectorJobResponse> listener) {
ADTaskAction adTaskAction = request.getAdTaskAction();
AnomalyDetector detector = request.getDetector();
DetectionDateRange detectionDateRange = request.getDetectionDateRange();
String detectorId = detector.getDetectorId();
ADTask adTask = request.getAdTask();
User user = request.getUser();
Integer availableTaskSlots = request.getAvailableTaskSLots();
String entityValue = adTaskManager.convertEntityToString(adTask);
switch (adTaskAction) {
case APPLY_FOR_TASK_SLOTS:
logger.debug("Received APPLY_FOR_TASK_SLOTS action for detector {}", detectorId);
adTaskManager.checkTaskSlots(adTask, detector, detectionDateRange, user, ADTaskAction.START, transportService, listener);
break;
case CHECK_AVAILABLE_TASK_SLOTS:
logger.debug("Received CHECK_AVAILABLE_TASK_SLOTS action for detector {}", detectorId);
adTaskManager
.checkTaskSlots(
adTask,
detector,
detectionDateRange,
user,
ADTaskAction.SCALE_ENTITY_TASK_SLOTS,
transportService,
listener
);
break;
case START:
// Start historical analysis for detector
logger.debug("Received START action for detector {}", detectorId);
adTaskManager.startDetector(detector, detectionDateRange, user, transportService, ActionListener.wrap(r -> {
adTaskCacheManager.setDetectorTaskSlots(detector.getDetectorId(), availableTaskSlots);
listener.onResponse(r);
}, e -> listener.onFailure(e)));
break;
case NEXT_ENTITY:
logger.debug("Received NEXT_ENTITY action for detector {}, task {}", detectorId, adTask.getTaskId());
// Run next entity for HC detector historical analysis.
if (detector.isMultientityDetector()) { // AD task could be HC detector level task or entity task
adTaskCacheManager.removeRunningEntity(detectorId, entityValue);
if (!adTaskCacheManager.hasEntity(detectorId)) {
adTaskCacheManager.setDetectorTaskSlots(detectorId, 0);
logger.info("Historical HC detector done, will remove from cache, detector id:{}", detectorId);
listener.onResponse(new AnomalyDetectorJobResponse(detectorId, 0, 0, 0, RestStatus.OK));
ADTaskState state = !adTask.isEntityTask() && adTask.getError() != null ? ADTaskState.FAILED : ADTaskState.FINISHED;
adTaskManager.setHCDetectorTaskDone(adTask, state, listener);
} else {
logger.debug("Run next entity for detector " + detectorId);
adTaskManager.runNextEntityForHCADHistorical(adTask, transportService, listener);
adTaskManager
.updateADHCDetectorTask(
detectorId,
adTask.getParentTaskId(),
ImmutableMap
.of(
STATE_FIELD,
ADTaskState.RUNNING.name(),
TASK_PROGRESS_FIELD,
adTaskManager.hcDetectorProgress(detectorId),
ERROR_FIELD,
adTask.getError() != null ? adTask.getError() : ""
)
);
}
} else {
logger
.warn(
"Can only handle HC entity task for NEXT_ENTITY action, taskId:{} , taskType:{}",
adTask.getTaskId(),
adTask.getTaskType()
);
listener.onFailure(new IllegalArgumentException("Unsupported task"));
}
break;
case PUSH_BACK_ENTITY:
logger.debug("Received PUSH_BACK_ENTITY action for detector {}, task {}", detectorId, adTask.getTaskId());
// Push back entity to pending entities queue and run next entity.
if (adTask.isEntityTask()) { // AD task must be entity level task.
adTaskCacheManager.removeRunningEntity(detectorId, entityValue);
if (adTaskManager.isRetryableError(adTask.getError())
&& !adTaskCacheManager.exceedRetryLimit(adTask.getDetectorId(), adTask.getTaskId())) {
// If retryable exception happens when run entity task, will push back entity to the end
// of pending entities queue, then we can retry it later.
adTaskCacheManager.pushBackEntity(adTask.getTaskId(), adTask.getDetectorId(), entityValue);
} else {
// If exception is not retryable or exceeds retry limit, will remove this entity.
adTaskCacheManager.removeEntity(adTask.getDetectorId(), entityValue);
logger.warn("Entity task failed, task id: {}, entity: {}", adTask.getTaskId(), adTask.getEntity().toString());
}
if (!adTaskCacheManager.hasEntity(detectorId)) {
adTaskCacheManager.setDetectorTaskSlots(detectorId, 0);
adTaskManager.setHCDetectorTaskDone(adTask, ADTaskState.FINISHED, listener);
} else {
logger.debug("scale task slots for PUSH_BACK_ENTITY, detector {} task {}", detectorId, adTask.getTaskId());
int taskSlots = adTaskCacheManager.scaleDownHCDetectorTaskSlots(detectorId, 1);
if (taskSlots == 1) {
logger.debug("After scale down, only 1 task slot reserved for detector {}, run next entity", detectorId);
adTaskManager.runNextEntityForHCADHistorical(adTask, transportService, listener);
}
listener.onResponse(new AnomalyDetectorJobResponse(adTask.getTaskId(), 0, 0, 0, RestStatus.ACCEPTED));
}
} else {
logger.warn("Can only push back entity task");
listener.onFailure(new IllegalArgumentException("Can only push back entity task"));
}
break;
case SCALE_ENTITY_TASK_SLOTS:
logger.debug("Received SCALE_ENTITY_TASK_LANE action for detector {}", detectorId);
// Check current available task slots and scale entity task lane.
if (availableTaskSlots != null && availableTaskSlots > 0) {
int newSlots = Math.min(availableTaskSlots, adTaskManager.detectorTaskSlotScaleDelta(detectorId));
if (newSlots > 0) {
adTaskCacheManager.setAllowedRunningEntities(detectorId, newSlots);
adTaskCacheManager.scaleUpDetectorTaskSlots(detectorId, newSlots);
}
}
listener.onResponse(new AnomalyDetectorJobResponse(detector.getDetectorId(), 0, 0, 0, RestStatus.OK));
break;
case CANCEL:
logger.debug("Received CANCEL action for detector {}", detectorId);
// Cancel HC detector's historical analysis.
// Don't support single detector for this action as single entity task will be cancelled directly
// on worker node.
if (detector.isMultientityDetector()) {
adTaskCacheManager.clearPendingEntities(detectorId);
adTaskCacheManager.removeRunningEntity(detectorId, entityValue);
if (!adTaskCacheManager.hasEntity(detectorId) || !adTask.isEntityTask()) {
adTaskManager.setHCDetectorTaskDone(adTask, ADTaskState.STOPPED, listener);
}
listener.onResponse(new AnomalyDetectorJobResponse(adTask.getTaskId(), 0, 0, 0, RestStatus.OK));
} else {
listener.onFailure(new IllegalArgumentException("Only support cancel HC now"));
}
break;
case CLEAN_STALE_RUNNING_ENTITIES:
logger.debug("Received CLEAN_STALE_RUNNING_ENTITIES action for detector {}", detectorId);
// Clean stale running entities of HC detector. For example, some worker node crashed or failed to send
// entity task done message to coordinating node, then coordinating node can't remove running entity
// from cache. We will check task profile when get task. If some entities exist in coordinating cache but
// doesn't exist in worker node's cache, we will clean up these stale running entities on coordinating node.
List<String> staleRunningEntities = request.getStaleRunningEntities();
logger
.debug(
"Clean stale running entities of task {}, staleRunningEntities: {}",
adTask.getTaskId(),
Arrays.toString(staleRunningEntities.toArray(new String[0]))
);
for (String entity : staleRunningEntities) {
adTaskManager.removeStaleRunningEntity(adTask, entity, transportService, listener);
}
listener.onResponse(new AnomalyDetectorJobResponse(adTask.getTaskId(), 0, 0, 0, RestStatus.OK));
break;
case CLEAN_CACHE:
boolean historicalTask = adTask.isHistoricalTask();
logger
.debug(
"Received CLEAN_CACHE action for detector {}, taskId: {}, historical: {}",
detectorId,
adTask.getTaskId(),
historicalTask
);
if (historicalTask) {
// Don't clear task cache if still has running entity. CLEAN_STALE_RUNNING_ENTITIES will clean
// stale running entity.
adTaskCacheManager.removeHistoricalTaskCacheIfNoRunningEntity(detectorId);
} else {
adTaskCacheManager.removeRealtimeTaskCache(detectorId);
// If hash ring changed like new node added when scale out, the realtime job coordinating node may
// change, then we should clean up cache on old coordinating node.
stateManager.clear(detectorId);
featureManager.clear(detectorId);
}
listener.onResponse(new AnomalyDetectorJobResponse(detector.getDetectorId(), 0, 0, 0, RestStatus.OK));
break;
default:
listener.onFailure(new OpenSearchStatusException("Unsupported AD task action " + adTaskAction, RestStatus.BAD_REQUEST));
break;
}
}