in src/main/java/org/opensearch/ad/task/ADTaskManager.java [556:653]
public void checkTaskSlots(
ADTask adTask,
AnomalyDetector detector,
DetectionDateRange detectionDateRange,
User user,
ADTaskAction afterCheckAction,
TransportService transportService,
ActionListener<AnomalyDetectorJobResponse> listener
) {
String detectorId = detector.getDetectorId();
logger.debug("Start checking task slots for detector: {}, task action: {}", detectorId, afterCheckAction);
if (!checkingTaskSlot.tryAcquire()) {
logger.info("Can't acquire checking task slot semaphore for detector {}", detectorId);
listener
.onFailure(
new OpenSearchStatusException(
"Too many historical analysis requests in short time. Please retry later.",
RestStatus.FORBIDDEN
)
);
return;
}
ActionListener<AnomalyDetectorJobResponse> wrappedActionListener = ActionListener.runAfter(listener, () -> {
checkingTaskSlot.release(1);
logger.debug("Release checking task slot semaphore on lead node for detector {}", detectorId);
});
hashRing.getNodesWithSameLocalAdVersion(nodes -> {
int maxAdTaskSlots = nodes.length * maxAdBatchTaskPerNode;
ADStatsRequest adStatsRequest = new ADStatsRequest(nodes);
adStatsRequest
.addAll(ImmutableSet.of(AD_USED_BATCH_TASK_SLOT_COUNT.getName(), AD_DETECTOR_ASSIGNED_BATCH_TASK_SLOT_COUNT.getName()));
client.execute(ADStatsNodesAction.INSTANCE, adStatsRequest, ActionListener.wrap(adStatsResponse -> {
int totalUsedTaskSlots = 0; // Total entity tasks running on worker nodes
int totalAssignedTaskSlots = 0; // Total assigned task slots on coordinating nodes
for (ADStatsNodeResponse response : adStatsResponse.getNodes()) {
totalUsedTaskSlots += (int) response.getStatsMap().get(AD_USED_BATCH_TASK_SLOT_COUNT.getName());
totalAssignedTaskSlots += (int) response.getStatsMap().get(AD_DETECTOR_ASSIGNED_BATCH_TASK_SLOT_COUNT.getName());
}
logger
.info(
"Current total used task slots is {}, total detector assigned task slots is {} when start historical "
+ "analysis for detector {}",
totalUsedTaskSlots,
totalAssignedTaskSlots,
detectorId
);
// In happy case, totalAssignedTaskSlots >= totalUsedTaskSlots. If some coordinating node left, then we can't
// get detector task slots cached on it, so it's possible that totalAssignedTaskSlots < totalUsedTaskSlots.
int currentUsedTaskSlots = Math.max(totalUsedTaskSlots, totalAssignedTaskSlots);
if (currentUsedTaskSlots >= maxAdTaskSlots) {
wrappedActionListener.onFailure(new OpenSearchStatusException("No available task slot", RestStatus.BAD_REQUEST));
return;
}
int availableAdTaskSlots = maxAdTaskSlots - currentUsedTaskSlots;
logger.info("Current available task slots is {} for historical analysis of detector {}", availableAdTaskSlots, detectorId);
if (ADTaskAction.SCALE_ENTITY_TASK_SLOTS == afterCheckAction) {
forwardToCoordinatingNode(
adTask,
detector,
detectionDateRange,
user,
afterCheckAction,
transportService,
wrappedActionListener,
availableAdTaskSlots
);
return;
}
// It takes long time to check top entities especially for multi-category HC. Tested with
// 1.8 billion docs for multi-category HC, it took more than 20 seconds and caused timeout.
// By removing top entity check, it took about 200ms to return. So just remove it to make
// sure REST API can return quickly.
// We may assign more task slots. For example, cluster has 4 data nodes, each node can run 2
// batch tasks, so the available task slot number is 8. If max running entities per HC is 4,
// then we will assign 4 tasks slots to this HC detector (4 is less than 8). The data index
// only has 2 entities. So we assign 2 more task slots than actual need. But it's ok as we
// will auto tune task slot when historical analysis task starts.
int approvedTaskSlots = detector.isMultientityDetector()
? Math.min(maxRunningEntitiesPerDetector, availableAdTaskSlots)
: 1;
forwardToCoordinatingNode(
adTask,
detector,
detectionDateRange,
user,
afterCheckAction,
transportService,
wrappedActionListener,
approvedTaskSlots
);
}, exception -> {
logger.error("Failed to get node's task stats for detector " + detectorId, exception);
wrappedActionListener.onFailure(exception);
}));
}, wrappedActionListener);
}