in storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java [1028:1107]
public void run() {
try {
while (!done) {
Set<TopoProfileAction> origProfileActions = new HashSet<>(profiling.get());
Set<BlobChanging> changingResourcesToHandle = dynamicState.changingBlobs;
if (!changingBlobs.isEmpty()) {
changingResourcesToHandle = new HashSet<>(changingResourcesToHandle);
changingBlobs.drainTo(changingResourcesToHandle);
Iterator<BlobChanging> it = changingResourcesToHandle.iterator();
//Remove/Clean up changed requests that are not for us
while (it.hasNext()) {
BlobChanging rc = it.next();
if (!forSameTopology(rc.assignment, dynamicState.currentAssignment)
&& !forSameTopology(rc.assignment, dynamicState.newAssignment)) {
rc.latch.countDown(); //Ignore the future
it.remove();
}
}
}
DynamicState nextState =
stateMachineStep(dynamicState.withNewAssignment(newAssignment.get())
.withProfileActions(origProfileActions, dynamicState.pendingStopProfileActions)
.withChangingBlobs(changingResourcesToHandle), staticState);
if (LOG.isDebugEnabled() || dynamicState.state != nextState.state) {
LOG.info("STATE {} -> {}", dynamicState, nextState);
}
//Save the current state for recovery
if ((nextState.currentAssignment != null
&& !nextState.currentAssignment.equals(dynamicState.currentAssignment))
|| (dynamicState.currentAssignment != null
&& !dynamicState.currentAssignment.equals(nextState.currentAssignment))) {
LOG.info("SLOT {}: Changing current assignment from {} to {}", staticState.port, dynamicState.currentAssignment,
nextState.currentAssignment);
saveNewAssignment(nextState.currentAssignment);
}
if (EquivalenceUtils.areLocalAssignmentsEquivalent(nextState.newAssignment, nextState.currentAssignment)
&& nextState.currentAssignment != null
&& nextState.currentAssignment.get_owner() == null
&& nextState.newAssignment != null
&& nextState.newAssignment.get_owner() != null) {
//This is an odd case for a rolling upgrade where the user on the old assignment may be null,
// but not on the new one. Although in all other ways they are the same.
// If this happens we want to use the assignment with the owner.
LOG.info("Updating assignment to save owner {}", nextState.newAssignment.get_owner());
saveNewAssignment(nextState.newAssignment);
nextState = nextState.withCurrentAssignment(nextState.container, nextState.newAssignment);
}
// clean up the profiler actions that are not being processed
Set<TopoProfileAction> removed = new HashSet<>(origProfileActions);
removed.removeAll(dynamicState.profileActions);
removed.removeAll(dynamicState.pendingStopProfileActions);
for (TopoProfileAction action : removed) {
try {
clusterState.deleteTopologyProfileRequests(action.topoId, action.request);
} catch (Exception e) {
LOG.error("Error trying to remove profiling request, it will be retried", e);
}
}
Set<TopoProfileAction> orig;
Set<TopoProfileAction> copy;
do {
orig = profiling.get();
copy = new HashSet<>(orig);
copy.removeAll(removed);
} while (!profiling.compareAndSet(orig, copy));
dynamicState = nextState;
}
} catch (Throwable e) {
if (!Utils.exceptionCauseIsInstanceOf(InterruptedException.class, e)) {
LOG.error("Error when processing event", e);
Utils.exitProcess(20, "Error when processing an event");
}
}
}