in src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java [1216:1303]
private void runNextPiece(
ADTask adTask,
long pieceStartTime,
long dataStartTime,
long dataEndTime,
long interval,
ActionListener<String> internalListener
) {
String taskId = adTask.getTaskId();
String detectorId = adTask.getDetectorId();
String detectorTaskId = adTask.getDetectorLevelTaskId();
float initProgress = calculateInitProgress(taskId);
String taskState = initProgress >= 1.0f ? ADTaskState.RUNNING.name() : ADTaskState.INIT.name();
logger.debug("Init progress: {}, taskState:{}, task id: {}", initProgress, taskState, taskId);
if (initProgress >= 1.0f && adTask.isEntityTask()) {
updateDetectorLevelTaskState(detectorId, adTask.getParentTaskId(), ADTaskState.RUNNING.name());
}
if (pieceStartTime < dataEndTime) {
checkIfADTaskCancelledAndCleanupCache(adTask);
threadPool.schedule(() -> {
checkClusterState(adTask);
long expectedPieceEndTime = pieceStartTime + pieceSize * interval;
long pieceEndTime = expectedPieceEndTime > dataEndTime ? dataEndTime : expectedPieceEndTime;
logger
.debug(
"task id: {}, start next piece start from {} to {}, interval {}",
adTask.getTaskId(),
pieceStartTime,
pieceEndTime,
interval
);
float taskProgress = (float) (pieceStartTime - dataStartTime) / (dataEndTime - dataStartTime);
logger.debug("Task progress: {}, task id:{}, detector id:{}", taskProgress, taskId, detectorId);
adTaskManager
.updateADTask(
taskId,
ImmutableMap
.of(
STATE_FIELD,
taskState,
CURRENT_PIECE_FIELD,
pieceStartTime,
TASK_PROGRESS_FIELD,
taskProgress,
INIT_PROGRESS_FIELD,
initProgress
),
ActionListener
.wrap(
r -> getFeatureData(
adTask,
pieceStartTime,
pieceEndTime,
dataStartTime,
dataEndTime,
interval,
Instant.now(),
internalListener
),
e -> internalListener.onFailure(e)
)
);
}, TimeValue.timeValueSeconds(pieceIntervalSeconds), AD_BATCH_TASK_THREAD_POOL_NAME);
} else {
logger.info("AD task finished for detector {}, task id: {}", detectorId, taskId);
adTaskCacheManager.remove(taskId, detectorId, detectorTaskId);
adTaskManager
.updateADTask(
taskId,
ImmutableMap
.of(
CURRENT_PIECE_FIELD,
dataEndTime,
TASK_PROGRESS_FIELD,
1.0f,
EXECUTION_END_TIME_FIELD,
Instant.now().toEpochMilli(),
INIT_PROGRESS_FIELD,
initProgress,
STATE_FIELD,
ADTaskState.FINISHED
),
ActionListener.wrap(r -> internalListener.onResponse("task execution done"), e -> internalListener.onFailure(e))
);
}
}