public void run()

in storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java [1028:1107]


    public void run() {
        try {
            while (!done) {
                Set<TopoProfileAction> origProfileActions = new HashSet<>(profiling.get());

                Set<BlobChanging> changingResourcesToHandle = dynamicState.changingBlobs;
                if (!changingBlobs.isEmpty()) {
                    changingResourcesToHandle = new HashSet<>(changingResourcesToHandle);
                    changingBlobs.drainTo(changingResourcesToHandle);
                    Iterator<BlobChanging> it = changingResourcesToHandle.iterator();

                    //Remove/Clean up changed requests that are not for us
                    while (it.hasNext()) {
                        BlobChanging rc = it.next();
                        if (!forSameTopology(rc.assignment, dynamicState.currentAssignment)
                                && !forSameTopology(rc.assignment, dynamicState.newAssignment)) {
                            rc.latch.countDown(); //Ignore the future
                            it.remove();
                        }
                    }
                }

                DynamicState nextState =
                    stateMachineStep(dynamicState.withNewAssignment(newAssignment.get())
                                                 .withProfileActions(origProfileActions, dynamicState.pendingStopProfileActions)
                                                 .withChangingBlobs(changingResourcesToHandle), staticState);

                if (LOG.isDebugEnabled() || dynamicState.state != nextState.state) {
                    LOG.info("STATE {} -> {}", dynamicState, nextState);
                }
                //Save the current state for recovery
                if ((nextState.currentAssignment != null
                                && !nextState.currentAssignment.equals(dynamicState.currentAssignment))
                        || (dynamicState.currentAssignment != null
                                && !dynamicState.currentAssignment.equals(nextState.currentAssignment))) {
                    LOG.info("SLOT {}: Changing current assignment from {} to {}", staticState.port, dynamicState.currentAssignment,
                             nextState.currentAssignment);
                    saveNewAssignment(nextState.currentAssignment);
                }

                if (EquivalenceUtils.areLocalAssignmentsEquivalent(nextState.newAssignment, nextState.currentAssignment)
                        && nextState.currentAssignment != null
                        && nextState.currentAssignment.get_owner() == null
                        && nextState.newAssignment != null
                        && nextState.newAssignment.get_owner() != null) {
                    //This is an odd case for a rolling upgrade where the user on the old assignment may be null,
                    // but not on the new one.  Although in all other ways they are the same.
                    // If this happens we want to use the assignment with the owner.
                    LOG.info("Updating assignment to save owner {}", nextState.newAssignment.get_owner());
                    saveNewAssignment(nextState.newAssignment);
                    nextState = nextState.withCurrentAssignment(nextState.container, nextState.newAssignment);
                }

                // clean up the profiler actions that are not being processed
                Set<TopoProfileAction> removed = new HashSet<>(origProfileActions);
                removed.removeAll(dynamicState.profileActions);
                removed.removeAll(dynamicState.pendingStopProfileActions);
                for (TopoProfileAction action : removed) {
                    try {
                        clusterState.deleteTopologyProfileRequests(action.topoId, action.request);
                    } catch (Exception e) {
                        LOG.error("Error trying to remove profiling request, it will be retried", e);
                    }
                }
                Set<TopoProfileAction> orig;
                Set<TopoProfileAction> copy;
                do {
                    orig = profiling.get();
                    copy = new HashSet<>(orig);
                    copy.removeAll(removed);
                } while (!profiling.compareAndSet(orig, copy));
                dynamicState = nextState;
            }
        } catch (Throwable e) {
            if (!Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
                LOG.error("Error when processing event", e);
                Utils.exitProcess(20, "Error when processing an event");
            }
        }
    }