in mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/master/resourcecluster/ExecutorStateManagerImpl.java [452:524]
public GetClusterUsageResponse getClusterUsage(GetClusterUsageRequest req) {
// default grouping is containerSkuID to usage
Map<String, Integer> pendingCountByGroupKey = new HashMap<>();
Map<String, Pair<Integer, Integer>> usageByGroupKey = new HashMap<>();
// helper struct to verify job has been fully deployed so we can remove it from pending
Map<String, List<MachineDefinition>> jobIdToMachineDef = new HashMap<>();
taskExecutorStateMap.forEach((key, value) -> {
if (value == null ||
value.getRegistration() == null) {
log.info("Empty registration: {}, {}. Skip usage request.", req.getClusterID(), key);
return;
}
Optional<String> groupKeyO =
req.getGroupKeyFunc().apply(value.getRegistration());
if (!groupKeyO.isPresent()) {
log.info("Empty groupKey from: {}, {}. Skip usage request.", req.getClusterID(), key);
return;
}
String groupKey = groupKeyO.get();
Pair<Integer, Integer> kvState = Pair.of(
value.isAvailable() && !value.isDisabled() ? 1 : 0,
value.isRegistered() ? 1 : 0);
if (usageByGroupKey.containsKey(groupKey)) {
Pair<Integer, Integer> prevState = usageByGroupKey.get(groupKey);
usageByGroupKey.put(
groupKey,
Pair.of(
kvState.getLeft() + prevState.getLeft(), kvState.getRight() + prevState.getRight()));
} else {
usageByGroupKey.put(groupKey, kvState);
}
if ((value.isAssigned() || value.isRunningTask()) && value.getWorkerId() != null) {
if (pendingJobRequests.getIfPresent(value.getWorkerId().getJobId()) != null) {
List<MachineDefinition> workers = jobIdToMachineDef.getOrDefault(value.getWorkerId().getJobId(), new ArrayList<>());
workers.add(value.getRegistration().getMachineDefinition());
jobIdToMachineDef.put(value.getWorkerId().getJobId(), workers);
}
}
if (!pendingCountByGroupKey.containsKey(groupKey)) {
pendingCountByGroupKey.put(
groupKey,
getPendingCountByTaskExecutorGroup(value.getRegistration().getGroup()));
}
});
// remove jobs from pending set which have all pending workers
jobIdToMachineDef.forEach((jobId, workers) -> {
final JobRequirements jobStats = pendingJobRequests.getIfPresent(jobId);
if (jobStats != null && jobStats.getTotalWorkers() <= workers.size()) {
log.info("Removing job {} from pending requests", jobId);
pendingJobRequests.invalidate(jobId);
}
});
GetClusterUsageResponseBuilder resBuilder = GetClusterUsageResponse.builder().clusterID(req.getClusterID());
usageByGroupKey.forEach((key, value) -> resBuilder.usage(UsageByGroupKey.builder()
.usageGroupKey(key)
.idleCount(value.getLeft() - pendingCountByGroupKey.get(key))
.totalCount(value.getRight())
.build()));
GetClusterUsageResponse res = resBuilder.build();
log.info("Usage result: {}", res);
return res;
}