private void lockingMkAssignments()

in storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java [2537:2673]


    private void lockingMkAssignments(Map<String, Assignment> existingAssignments, Map<String, StormBase> bases,
                                      String scratchTopoId, List<String> assignedTopologyIds, IStormClusterState state,
                                      Map<String, TopologyDetails> tds) throws Exception {
        Topologies topologies = new Topologies(tds);

        synchronized (schedLock) {
            Map<String, SchedulerAssignment> newSchedulerAssignments =
                    computeNewSchedulerAssignments(existingAssignments, topologies, bases, scratchTopoId);

            Map<String, Map<List<Long>, List<Object>>> topologyToExecutorToNodePort =
                    computeTopoToExecToNodePort(newSchedulerAssignments, assignedTopologyIds);
            Map<String, Map<WorkerSlot, WorkerResources>> newAssignedWorkerToResources =
                    computeTopoToNodePortToResources(newSchedulerAssignments);
            int nowSecs = Time.currentTimeSecs();
            Map<String, SupervisorDetails> basicSupervisorDetailsMap = basicSupervisorDetailsMap(state);
            //construct the final Assignments by adding start-times etc into it
            Map<String, Assignment> newAssignments = new HashMap<>();
            for (Entry<String, Map<List<Long>, List<Object>>> entry : topologyToExecutorToNodePort.entrySet()) {
                String topoId = entry.getKey();
                Map<List<Long>, List<Object>> execToNodePort = entry.getValue();
                if (execToNodePort == null) {
                    execToNodePort = new HashMap<>();
                }
                Set<String> allNodes = new HashSet<>();
                for (List<Object> nodePort : execToNodePort.values()) {
                    allNodes.add((String) nodePort.get(0));
                }
                Map<String, String> allNodeHost = new HashMap<>();
                Assignment existingAssignment = existingAssignments.get(topoId);
                if (existingAssignment != null) {
                    allNodeHost.putAll(existingAssignment.get_node_host());
                }
                for (String node : allNodes) {
                    String host = inimbus.getHostName(basicSupervisorDetailsMap, node);
                    if (host != null) {
                        allNodeHost.put(node, host);
                    }
                }
                Map<List<Long>, NodeInfo> execNodeInfo = null;
                if (existingAssignment != null) {
                    execNodeInfo = existingAssignment.get_executor_node_port();
                }
                List<List<Long>> reassignExecutors = changedExecutors(execNodeInfo, execToNodePort);
                Map<List<Long>, Long> startTimes = new HashMap<>();
                if (existingAssignment != null) {
                    startTimes.putAll(existingAssignment.get_executor_start_time_secs());
                }
                for (List<Long> id : reassignExecutors) {
                    startTimes.put(id, (long) nowSecs);
                }
                Map<WorkerSlot, WorkerResources> workerToResources = newAssignedWorkerToResources.get(topoId);
                if (workerToResources == null) {
                    workerToResources = new HashMap<>();
                }
                Assignment newAssignment = new Assignment((String) conf.get(Config.STORM_LOCAL_DIR));
                Map<String, String> justAssignedKeys = new HashMap<>(allNodeHost);
                //Modifies justAssignedKeys
                justAssignedKeys.keySet().retainAll(allNodes);
                newAssignment.set_node_host(justAssignedKeys);
                //convert NodePort to NodeInfo (again!!!).
                Map<List<Long>, NodeInfo> execToNodeInfo = new HashMap<>();
                for (Entry<List<Long>, List<Object>> execAndNodePort : execToNodePort.entrySet()) {
                    List<Object> nodePort = execAndNodePort.getValue();
                    NodeInfo ni = new NodeInfo();
                    ni.set_node((String) nodePort.get(0));
                    ni.add_to_port((Long) nodePort.get(1));
                    execToNodeInfo.put(execAndNodePort.getKey(), ni);
                }
                newAssignment.set_executor_node_port(execToNodeInfo);
                newAssignment.set_executor_start_time_secs(startTimes);
                //do another conversion (lets just make this all common)
                Map<NodeInfo, WorkerResources> workerResources = new HashMap<>();
                for (Entry<WorkerSlot, WorkerResources> wr : workerToResources.entrySet()) {
                    WorkerSlot nodePort = wr.getKey();
                    NodeInfo ni = new NodeInfo();
                    ni.set_node(nodePort.getNodeId());
                    ni.add_to_port(nodePort.getPort());
                    WorkerResources resources = wr.getValue();
                    workerResources.put(ni, resources);
                }
                newAssignment.set_worker_resources(workerResources);
                TopologyDetails td = tds.get(topoId);
                newAssignment.set_owner(td.getTopologySubmitter());
                newAssignments.put(topoId, newAssignment);
            }

            boolean assignmentChanged = auditAssignmentChanges(existingAssignments, newAssignments);
            if (assignmentChanged) {
                LOG.debug("RESETTING id->resources and id->worker-resources cache!");
                idToResources.set(new HashMap<>());
                idToWorkerResources.set(new HashMap<>());
            }

            //tasks figure out what tasks to talk to by looking at topology at runtime
            // only log/set when there's been a change to the assignment
            for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
                String topoId = entry.getKey();
                Assignment assignment = entry.getValue();
                Assignment existingAssignment = existingAssignments.get(topoId);
                TopologyDetails td = topologies.getById(topoId);
                if (assignment.equals(existingAssignment)) {
                    LOG.debug("Assignment for {} hasn't changed", topoId);
                } else {
                    LOG.info("Setting new assignment for topology id {}: {}", topoId, assignment);
                    state.setAssignment(topoId, assignment, td.getConf());
                }
            }

            //grouping assignment by node to see the nodes diff, then notify nodes/supervisors to synchronize its owned assignment
            //because the number of existing assignments is small for every scheduling round,
            //we expect to notify supervisors at almost the same time
            Map<String, String> totalAssignmentsChangedNodes = new HashMap<>();
            for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
                String topoId = entry.getKey();
                Assignment assignment = entry.getValue();
                Assignment existingAssignment = existingAssignments.get(topoId);
                totalAssignmentsChangedNodes.putAll(assignmentChangedNodes(existingAssignment, assignment));
            }
            notifySupervisorsAssignments(newAssignments, assignmentsDistributer, totalAssignmentsChangedNodes,
                                        basicSupervisorDetailsMap, getMetricsRegistry());

            Map<String, Collection<WorkerSlot>> addedSlots = new HashMap<>();
            for (Entry<String, Assignment> entry : newAssignments.entrySet()) {
                String topoId = entry.getKey();
                Assignment assignment = entry.getValue();
                Assignment existingAssignment = existingAssignments.get(topoId);
                if (existingAssignment == null) {
                    existingAssignment = new Assignment();
                    existingAssignment.set_executor_node_port(new HashMap<>());
                    existingAssignment.set_executor_start_time_secs(new HashMap<>());
                }
                Set<WorkerSlot> newSlots = newlyAddedSlots(existingAssignment, assignment);
                addedSlots.put(topoId, newSlots);
            }
            inimbus.assignSlots(topologies, addedSlots);
        }
    }