private void runNextPiece()

in src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java [1216:1303]


    private void runNextPiece(
        ADTask adTask,
        long pieceStartTime,
        long dataStartTime,
        long dataEndTime,
        long interval,
        ActionListener<String> internalListener
    ) {
        String taskId = adTask.getTaskId();
        String detectorId = adTask.getDetectorId();
        String detectorTaskId = adTask.getDetectorLevelTaskId();
        float initProgress = calculateInitProgress(taskId);
        String taskState = initProgress >= 1.0f ? ADTaskState.RUNNING.name() : ADTaskState.INIT.name();
        logger.debug("Init progress: {}, taskState:{}, task id: {}", initProgress, taskState, taskId);

        if (initProgress >= 1.0f && adTask.isEntityTask()) {
            updateDetectorLevelTaskState(detectorId, adTask.getParentTaskId(), ADTaskState.RUNNING.name());
        }

        if (pieceStartTime < dataEndTime) {
            checkIfADTaskCancelledAndCleanupCache(adTask);
            threadPool.schedule(() -> {
                checkClusterState(adTask);
                long expectedPieceEndTime = pieceStartTime + pieceSize * interval;
                long pieceEndTime = expectedPieceEndTime > dataEndTime ? dataEndTime : expectedPieceEndTime;
                logger
                    .debug(
                        "task id: {}, start next piece start from {} to {}, interval {}",
                        adTask.getTaskId(),
                        pieceStartTime,
                        pieceEndTime,
                        interval
                    );
                float taskProgress = (float) (pieceStartTime - dataStartTime) / (dataEndTime - dataStartTime);
                logger.debug("Task progress: {}, task id:{}, detector id:{}", taskProgress, taskId, detectorId);
                adTaskManager
                    .updateADTask(
                        taskId,
                        ImmutableMap
                            .of(
                                STATE_FIELD,
                                taskState,
                                CURRENT_PIECE_FIELD,
                                pieceStartTime,
                                TASK_PROGRESS_FIELD,
                                taskProgress,
                                INIT_PROGRESS_FIELD,
                                initProgress
                            ),
                        ActionListener
                            .wrap(
                                r -> getFeatureData(
                                    adTask,
                                    pieceStartTime,
                                    pieceEndTime,
                                    dataStartTime,
                                    dataEndTime,
                                    interval,
                                    Instant.now(),
                                    internalListener
                                ),
                                e -> internalListener.onFailure(e)
                            )
                    );
            }, TimeValue.timeValueSeconds(pieceIntervalSeconds), AD_BATCH_TASK_THREAD_POOL_NAME);
        } else {
            logger.info("AD task finished for detector {}, task id: {}", detectorId, taskId);
            adTaskCacheManager.remove(taskId, detectorId, detectorTaskId);
            adTaskManager
                .updateADTask(
                    taskId,
                    ImmutableMap
                        .of(
                            CURRENT_PIECE_FIELD,
                            dataEndTime,
                            TASK_PROGRESS_FIELD,
                            1.0f,
                            EXECUTION_END_TIME_FIELD,
                            Instant.now().toEpochMilli(),
                            INIT_PROGRESS_FIELD,
                            initProgress,
                            STATE_FIELD,
                            ADTaskState.FINISHED
                        ),
                    ActionListener.wrap(r -> internalListener.onResponse("task execution done"), e -> internalListener.onFailure(e))
                );
        }
    }