in storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverStrategy.java [129:247]
public static boolean validateSolution(Cluster cluster, TopologyDetails topo) {
LOG.debug("Checking for a valid scheduling for topology {}...", topo.getName());
if (cluster.getAssignmentById(topo.getId()) == null) {
String err = "cluster.getAssignmentById(\"" + topo.getId() + "\") returned null";
LOG.error(err);
throw new AssertionError("No assignments for topologyId " + topo.getId());
}
ConstraintSolverConfig constraintSolverConfig = new ConstraintSolverConfig(topo);
// First check NodeCoLocationCnt constraints
Map<ExecutorDetails, String> execToComp = topo.getExecutorToComponent();
Map<String, Map<String, Integer>> nodeCompMap = new HashMap<>(); // this is the critical count
Map<WorkerSlot, RasNode> workerToNodes = new HashMap<>();
RasNodes.getAllNodesFrom(cluster)
.values()
.forEach(node -> node.getUsedSlots().forEach(workerSlot -> workerToNodes.put(workerSlot, node)));
List<String> errors = new ArrayList<>();
for (Map.Entry<ExecutorDetails, WorkerSlot> entry : cluster.getAssignmentById(topo.getId()).getExecutorToSlot().entrySet()) {
ExecutorDetails exec = entry.getKey();
String comp = execToComp.get(exec);
WorkerSlot worker = entry.getValue();
RasNode node = workerToNodes.get(worker);
String nodeId = node.getId();
if (!constraintSolverConfig.getMaxNodeCoLocationCnts().containsKey(comp)) {
continue;
}
int allowedColocationMaxCnt = constraintSolverConfig.getMaxNodeCoLocationCnts().get(comp);
Map<String, Integer> oneNodeCompMap = nodeCompMap.computeIfAbsent(nodeId, (k) -> new HashMap<>());
oneNodeCompMap.put(comp, oneNodeCompMap.getOrDefault(comp, 0) + 1);
if (allowedColocationMaxCnt < oneNodeCompMap.get(comp)) {
String err = String.format("MaxNodeCoLocation: Component %s (exec=%s) on node %s, cnt %d > allowed %d",
comp, exec, nodeId, oneNodeCompMap.get(comp), allowedColocationMaxCnt);
errors.add(err);
}
}
// Second check IncompatibileComponent Constraints
Map<WorkerSlot, Set<String>> workerCompMap = new HashMap<>();
cluster.getAssignmentById(topo.getId()).getExecutorToSlot()
.forEach((exec, worker) -> {
String comp = execToComp.get(exec);
workerCompMap.computeIfAbsent(worker, (k) -> new HashSet<>()).add(comp);
});
for (Map.Entry<WorkerSlot, Set<String>> entry : workerCompMap.entrySet()) {
Set<String> comps = entry.getValue();
for (String comp1 : comps) {
for (String comp2 : comps) {
if (!comp1.equals(comp2)
&& constraintSolverConfig.getIncompatibleComponentSets().containsKey(comp1)
&& constraintSolverConfig.getIncompatibleComponentSets().get(comp1).contains(comp2)) {
String err = String.format("IncompatibleComponents: %s and %s on WorkerSlot: %s",
comp1, comp2, entry.getKey());
errors.add(err);
}
}
}
}
// Third check resources
SchedulerAssignment schedulerAssignment = cluster.getAssignmentById(topo.getId());
Map<ExecutorDetails, WorkerSlot> execToWorker = new HashMap<>();
if (schedulerAssignment.getExecutorToSlot() != null) {
execToWorker.putAll(schedulerAssignment.getExecutorToSlot());
}
Map<String, RasNode> nodes = RasNodes.getAllNodesFrom(cluster);
Map<RasNode, Collection<ExecutorDetails>> nodeToExecs = new HashMap<>();
for (Map.Entry<ExecutorDetails, WorkerSlot> entry : execToWorker.entrySet()) {
ExecutorDetails exec = entry.getKey();
WorkerSlot worker = entry.getValue();
RasNode node = nodes.get(worker.getNodeId());
if (node.getAvailableMemoryResources() < 0.0) {
String err = String.format("Resource Exhausted: Found node %s with negative available memory %,.2f",
node.getId(), node.getAvailableMemoryResources());
errors.add(err);
continue;
}
if (node.getAvailableCpuResources() < 0.0) {
String err = String.format("Resource Exhausted: Found node %s with negative available CPU %,.2f",
node.getId(), node.getAvailableCpuResources());
errors.add(err);
continue;
}
nodeToExecs.computeIfAbsent(node, (k) -> new HashSet<>()).add(exec);
}
for (Map.Entry<RasNode, Collection<ExecutorDetails>> entry : nodeToExecs.entrySet()) {
RasNode node = entry.getKey();
Collection<ExecutorDetails> execs = entry.getValue();
double cpuUsed = 0.0;
double memoryUsed = 0.0;
for (ExecutorDetails exec : execs) {
cpuUsed += topo.getTotalCpuReqTask(exec);
memoryUsed += topo.getTotalMemReqTask(exec);
}
if (node.getAvailableCpuResources() != (node.getTotalCpuResources() - cpuUsed)) {
String err = String.format("Incorrect CPU Resources: Node %s CPU available is %,.2f, expected %,.2f, "
+ "Executors scheduled on node: %s",
node.getId(), node.getAvailableCpuResources(), (node.getTotalCpuResources() - cpuUsed), execs);
errors.add(err);
}
if (node.getAvailableMemoryResources() != (node.getTotalMemoryResources() - memoryUsed)) {
String err = String.format("Incorrect Memory Resources: Node %s Memory available is %,.2f, expected %,.2f, "
+ "Executors scheduled on node: %s",
node.getId(), node.getAvailableMemoryResources(), (node.getTotalMemoryResources() - memoryUsed), execs);
errors.add(err);
}
}
if (!errors.isEmpty()) {
LOG.error("Topology {} solution is invalid\n\t{}", topo.getName(), String.join("\n\t", errors));
}
return errors.isEmpty();
}