public void checkTaskSlots()

in src/main/java/org/opensearch/ad/task/ADTaskManager.java [556:653]


    public void checkTaskSlots(
        ADTask adTask,
        AnomalyDetector detector,
        DetectionDateRange detectionDateRange,
        User user,
        ADTaskAction afterCheckAction,
        TransportService transportService,
        ActionListener<AnomalyDetectorJobResponse> listener
    ) {
        String detectorId = detector.getDetectorId();
        logger.debug("Start checking task slots for detector: {}, task action: {}", detectorId, afterCheckAction);
        if (!checkingTaskSlot.tryAcquire()) {
            logger.info("Can't acquire checking task slot semaphore for detector {}", detectorId);
            listener
                .onFailure(
                    new OpenSearchStatusException(
                        "Too many historical analysis requests in short time. Please retry later.",
                        RestStatus.FORBIDDEN
                    )
                );
            return;
        }
        ActionListener<AnomalyDetectorJobResponse> wrappedActionListener = ActionListener.runAfter(listener, () -> {
            checkingTaskSlot.release(1);
            logger.debug("Release checking task slot semaphore on lead node for detector {}", detectorId);
        });
        hashRing.getNodesWithSameLocalAdVersion(nodes -> {
            int maxAdTaskSlots = nodes.length * maxAdBatchTaskPerNode;
            ADStatsRequest adStatsRequest = new ADStatsRequest(nodes);
            adStatsRequest
                .addAll(ImmutableSet.of(AD_USED_BATCH_TASK_SLOT_COUNT.getName(), AD_DETECTOR_ASSIGNED_BATCH_TASK_SLOT_COUNT.getName()));
            client.execute(ADStatsNodesAction.INSTANCE, adStatsRequest, ActionListener.wrap(adStatsResponse -> {
                int totalUsedTaskSlots = 0; // Total entity tasks running on worker nodes
                int totalAssignedTaskSlots = 0; // Total assigned task slots on coordinating nodes
                for (ADStatsNodeResponse response : adStatsResponse.getNodes()) {
                    totalUsedTaskSlots += (int) response.getStatsMap().get(AD_USED_BATCH_TASK_SLOT_COUNT.getName());
                    totalAssignedTaskSlots += (int) response.getStatsMap().get(AD_DETECTOR_ASSIGNED_BATCH_TASK_SLOT_COUNT.getName());
                }
                logger
                    .info(
                        "Current total used task slots is {}, total detector assigned task slots is {} when start historical "
                            + "analysis for detector {}",
                        totalUsedTaskSlots,
                        totalAssignedTaskSlots,
                        detectorId
                    );
                // In happy case, totalAssignedTaskSlots >= totalUsedTaskSlots. If some coordinating node left, then we can't
                // get detector task slots cached on it, so it's possible that totalAssignedTaskSlots < totalUsedTaskSlots.
                int currentUsedTaskSlots = Math.max(totalUsedTaskSlots, totalAssignedTaskSlots);
                if (currentUsedTaskSlots >= maxAdTaskSlots) {
                    wrappedActionListener.onFailure(new OpenSearchStatusException("No available task slot", RestStatus.BAD_REQUEST));
                    return;
                }
                int availableAdTaskSlots = maxAdTaskSlots - currentUsedTaskSlots;
                logger.info("Current available task slots is {} for historical analysis of detector {}", availableAdTaskSlots, detectorId);

                if (ADTaskAction.SCALE_ENTITY_TASK_SLOTS == afterCheckAction) {
                    forwardToCoordinatingNode(
                        adTask,
                        detector,
                        detectionDateRange,
                        user,
                        afterCheckAction,
                        transportService,
                        wrappedActionListener,
                        availableAdTaskSlots
                    );
                    return;
                }

                // It takes long time to check top entities especially for multi-category HC. Tested with
                // 1.8 billion docs for multi-category HC, it took more than 20 seconds and caused timeout.
                // By removing top entity check, it took about 200ms to return. So just remove it to make
                // sure REST API can return quickly.
                // We may assign more task slots. For example, cluster has 4 data nodes, each node can run 2
                // batch tasks, so the available task slot number is 8. If max running entities per HC is 4,
                // then we will assign 4 tasks slots to this HC detector (4 is less than 8). The data index
                // only has 2 entities. So we assign 2 more task slots than actual need. But it's ok as we
                // will auto tune task slot when historical analysis task starts.
                int approvedTaskSlots = detector.isMultientityDetector()
                    ? Math.min(maxRunningEntitiesPerDetector, availableAdTaskSlots)
                    : 1;
                forwardToCoordinatingNode(
                    adTask,
                    detector,
                    detectionDateRange,
                    user,
                    afterCheckAction,
                    transportService,
                    wrappedActionListener,
                    approvedTaskSlots
                );
            }, exception -> {
                logger.error("Failed to get node's task stats for detector " + detectorId, exception);
                wrappedActionListener.onFailure(exception);
            }));
        }, wrappedActionListener);
    }