in storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java [768:884]
private static DynamicState handleRunning(DynamicState dynamicState, StaticState staticState) throws Exception {
if (dynamicState.container == null) {
throw new Exception("dynamicState.container is null");
}
if (dynamicState.currentAssignment == null) {
throw new Exception("dynamicState.currentAssignment is null");
}
if (!dynamicState.pendingChangingBlobs.isEmpty()) {
throw new Exception("dynamicState.pendingChangingBlobs is not empty");
}
if (dynamicState.pendingChangingBlobsAssignment != null) {
throw new Exception("dynamicState.pendingChangingBlobsAssignment is not null");
}
if (dynamicState.pendingLocalization != null) {
throw new Exception("dynamicState.pendingLocalization is not null");
}
if (dynamicState.pendingDownload != null) {
throw new Exception("dynamicState.pendingDownload is not null");
}
if (!EquivalenceUtils.areLocalAssignmentsEquivalent(dynamicState.newAssignment, dynamicState.currentAssignment)) {
LOG.info("SLOT {}: Assignment Changed from {} to {}", staticState.port, dynamicState.currentAssignment,
dynamicState.newAssignment);
//Scheduling changed while running...
return killContainerFor(KillReason.ASSIGNMENT_CHANGED, dynamicState, staticState);
}
dynamicState = updateAssignmentIfNeeded(dynamicState);
dynamicState = filterChangingBlobsFor(dynamicState, dynamicState.currentAssignment);
if (!dynamicState.changingBlobs.isEmpty()) {
//Kill the container and restart it
return killContainerFor(KillReason.BLOB_CHANGED, dynamicState, staticState);
}
if (dynamicState.container.didMainProcessExit()) {
LOG.warn("SLOT {}: main process has exited for topology: {}",
staticState.port, dynamicState.currentAssignment.get_topology_id());
return killContainerFor(KillReason.PROCESS_EXIT, dynamicState, staticState);
}
if (dynamicState.container.isMemoryLimitViolated(dynamicState.currentAssignment)) {
LOG.warn("SLOT {}: violated memory limits for topology: {}",
staticState.port, dynamicState.currentAssignment.get_topology_id());
return killContainerFor(KillReason.MEMORY_VIOLATION, dynamicState, staticState);
}
LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat();
if (hb == null) {
LOG.warn("SLOT {}: HB returned as null for topology: {}",
staticState.port, dynamicState.currentAssignment.get_topology_id());
//This can happen if the supervisor crashed after launching a
// worker that never came up.
return killContainerFor(KillReason.HB_NULL, dynamicState, staticState);
}
long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000;
long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState);
if (timeDiffMs > hbTimeoutMs) {
LOG.warn("SLOT {}: HB is too old {} > {} for topology: {}",
staticState.port, timeDiffMs, hbTimeoutMs, dynamicState.currentAssignment.get_topology_id());
return killContainerFor(KillReason.HB_TIMEOUT, dynamicState, staticState);
}
//The worker is up and running check for profiling requests
if (!dynamicState.profileActions.isEmpty()) {
HashSet<TopoProfileAction> mod = new HashSet<>(dynamicState.profileActions);
HashSet<TopoProfileAction> modPending = new HashSet<>(dynamicState.pendingStopProfileActions);
Iterator<TopoProfileAction> iter = mod.iterator();
while (iter.hasNext()) {
TopoProfileAction action = iter.next();
if (!action.topoId.equals(dynamicState.currentAssignment.get_topology_id())) {
iter.remove();
LOG.warn("Dropping {} wrong topology is running", action);
//Not for this topology so skip it
} else {
if (modPending.contains(action)) {
boolean isTimeForStop = Time.currentTimeMillis() > action.request.get_time_stamp();
if (isTimeForStop) {
if (dynamicState.container.runProfiling(action.request, true)) {
LOG.debug("Stopped {} action finished", action);
iter.remove();
modPending.remove(action);
} else {
LOG.warn("Stopping {} failed, will be retried", action);
}
} else {
LOG.debug("Still pending {} now: {}", action, Time.currentTimeMillis());
}
} else {
//J_PROFILE_START is not used. When you see a J_PROFILE_STOP
// start profiling and save it away to stop when timeout happens
if (action.request.get_action() == ProfileAction.JPROFILE_STOP) {
if (dynamicState.container.runProfiling(action.request, false)) {
modPending.add(action);
LOG.debug("Started {} now: {}", action, Time.currentTimeMillis());
} else {
LOG.warn("Starting {} failed, will be retried", action);
}
} else {
if (dynamicState.container.runProfiling(action.request, false)) {
LOG.debug("Started {} action finished", action);
iter.remove();
} else {
LOG.warn("Starting {} failed, will be retried", action);
}
}
}
}
}
dynamicState = dynamicState.withProfileActions(mod, modPending);
}
dynamicState.container.processMetrics(staticState.metricsExec, staticState.metricsProcessor);
Time.sleep(staticState.monitorFreqMs);
return dynamicState;
}