in storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java [2537:2673]
private void lockingMkAssignments(Map<String, Assignment> existingAssignments, Map<String, StormBase> bases,
String scratchTopoId, List<String> assignedTopologyIds, IStormClusterState state,
Map<String, TopologyDetails> tds) throws Exception {
Topologies topologies = new Topologies(tds);
synchronized (schedLock) {
Map<String, SchedulerAssignment> newSchedulerAssignments =
computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId);
Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort =
computeTopoToExecToNodePort(newSchedulerAssignments, assignedTopologyIds);
Map<String, Map<WorkerSlot, WorkerResources>> newAssignedWorkerToResources =
computeTopoToNodePortToResources(newSchedulerAssignments);
int nowSecs = Time.currentTimeSecs();
Map<String, SupervisorDetails> basicSupervisorDetailsMap = basicSupervisorDetailsMap(state);
//construct the final Assignments by adding start-times etc into it
Map<String, Assignment> newAssignments = new HashMap<>();
for (Entry<String, Map<List<Long>, List<Object>>> entry : topologyToExecutorToNodePort.entrySet()) {
String topoId = entry.getKey();
Map<List<Long>, List<Object>> execToNodePort = entry.getValue();
if (execToNodePort == null) {
execToNodePort = new HashMap<>();
}
Set<String> allNodes = new HashSet<>();
for (List<Object> nodePort : execToNodePort.values()) {
allNodes.add((String) nodePort.get(0));
}
Map<String, String> allNodeHost = new HashMap<>();
Assignment existingAssignment = existingAssignments.get(topoId);
if (existingAssignment != null) {
allNodeHost.putAll(existingAssignment.get_node_host());
}
for (String node : allNodes) {
String host = inimbus.getHostName(basicSupervisorDetailsMap, node);
if (host != null) {
allNodeHost.put(node, host);
}
}
Map<List<Long>, NodeInfo> execNodeInfo = null;
if (existingAssignment != null) {
execNodeInfo = existingAssignment.get_executor_node_port();
}
List<List<Long>> reassignExecutors = changedExecutors(execNodeInfo, execToNodePort);
Map<List<Long>, Long> startTimes = new HashMap<>();
if (existingAssignment != null) {
startTimes.putAll(existingAssignment.get_executor_start_time_secs());
}
for (List<Long> id : reassignExecutors) {
startTimes.put(id, (long) nowSecs);
}
Map<WorkerSlot, WorkerResources> workerToResources = newAssignedWorkerToResources.get(topoId);
if (workerToResources == null) {
workerToResources = new HashMap<>();
}
Assignment newAssignment = new Assignment((String) conf.get(Config.STORM_LOCAL_DIR));
Map<String, String> justAssignedKeys = new HashMap<>(allNodeHost);
//Modifies justAssignedKeys
justAssignedKeys.keySet().retainAll(allNodes);
newAssignment.set_node_host(justAssignedKeys);
//convert NodePort to NodeInfo (again!!!).
Map<List<Long>, NodeInfo> execToNodeInfo = new HashMap<>();
for (Entry<List<Long>, List<Object>> execAndNodePort : execToNodePort.entrySet()) {
List<Object> nodePort = execAndNodePort.getValue();
NodeInfo ni = new NodeInfo();
ni.set_node((String) nodePort.get(0));
ni.add_to_port((Long) nodePort.get(1));
execToNodeInfo.put(execAndNodePort.getKey(), ni);
}
newAssignment.set_executor_node_port(execToNodeInfo);
newAssignment.set_executor_start_time_secs(startTimes);
//do another conversion (lets just make this all common)
Map<NodeInfo, WorkerResources> workerResources = new HashMap<>();
for (Entry<WorkerSlot, WorkerResources> wr : workerToResources.entrySet()) {
WorkerSlot nodePort = wr.getKey();
NodeInfo ni = new NodeInfo();
ni.set_node(nodePort.getNodeId());
ni.add_to_port(nodePort.getPort());
WorkerResources resources = wr.getValue();
workerResources.put(ni, resources);
}
newAssignment.set_worker_resources(workerResources);
TopologyDetails td = tds.get(topoId);
newAssignment.set_owner(td.getTopologySubmitter());
newAssignments.put(topoId, newAssignment);
}
boolean assignmentChanged = auditAssignmentChanges(existingAssignments, newAssignments);
if (assignmentChanged) {
LOG.debug("RESETTING id->resources and id->worker-resources cache!");
idToResources.set(new HashMap<>());
idToWorkerResources.set(new HashMap<>());
}
//tasks figure out what tasks to talk to by looking at topology at runtime
// only log/set when there's been a change to the assignment
for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
String topoId = entry.getKey();
Assignment assignment = entry.getValue();
Assignment existingAssignment = existingAssignments.get(topoId);
TopologyDetails td = topologies.getById(topoId);
if (assignment.equals(existingAssignment)) {
LOG.debug("Assignment for {} hasn't changed", topoId);
} else {
LOG.info("Setting new assignment for topology id {}: {}", topoId, assignment);
state.setAssignment(topoId, assignment, td.getConf());
}
}
//grouping assignment by node to see the nodes diff, then notify nodes/supervisors to synchronize its owned assignment
//because the number of existing assignments is small for every scheduling round,
//we expect to notify supervisors at almost the same time
Map<String, String> totalAssignmentsChangedNodes = new HashMap<>();
for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
String topoId = entry.getKey();
Assignment assignment = entry.getValue();
Assignment existingAssignment = existingAssignments.get(topoId);
totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment));
}
notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes,
basicSupervisorDetailsMap, getMetricsRegistry());
Map<String, Collection<WorkerSlot>> addedSlots = new HashMap<>();
for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
String topoId = entry.getKey();
Assignment assignment = entry.getValue();
Assignment existingAssignment = existingAssignments.get(topoId);
if (existingAssignment == null) {
existingAssignment = new Assignment();
existingAssignment.set_executor_node_port(new HashMap<>());
existingAssignment.set_executor_start_time_secs(new HashMap<>());
}
Set<WorkerSlot> newSlots = newlyAddedSlots(existingAssignment, assignment);
addedSlots.put(topoId, newSlots);
}
inimbus.assignSlots(topologies, addedSlots);
}
}