in storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java [76:155]
public StormClusterStateImpl(IStateStorage stateStorage, ILocalAssignmentsBackend assignmentsassignmentsBackend,
ClusterStateContext context, boolean shouldCloseStateStorageOnDisconnect) throws Exception {
this.stateStorage = stateStorage;
this.shouldCloseStateStorageOnDisconnect = shouldCloseStateStorageOnDisconnect;
this.defaultAcls = context.getDefaultZkAcls();
this.context = context;
this.assignmentsBackend = assignmentsassignmentsBackend;
assignmentInfoCallback = new ConcurrentHashMap<>();
assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
assignmentVersionCallback = new ConcurrentHashMap<>();
supervisorsCallback = new AtomicReference<>();
backPressureCallback = new ConcurrentHashMap<>();
leaderInfoCallback = new AtomicReference<>();
assignmentsCallback = new AtomicReference<>();
stormBaseCallback = new ConcurrentHashMap<>();
credentialsCallback = new ConcurrentHashMap<>();
logConfigCallback = new ConcurrentHashMap<>();
blobstoreCallback = new AtomicReference<>();
stateId = this.stateStorage.register(new ZKStateChangedCallback() {
@Override
public void changed(Watcher.Event.EventType type, String path) {
List<String> toks = tokenizePath(path);
int size = toks.size();
if (size >= 1) {
String root = toks.get(0);
if (root.equals(ClusterUtils.ASSIGNMENTS_ROOT)) {
if (size == 1) {
// set null and get the old value
issueCallback(assignmentsCallback);
} else {
issueMapCallback(assignmentInfoCallback, toks.get(1));
issueMapCallback(assignmentVersionCallback, toks.get(1));
issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1));
}
} else if (root.equals(ClusterUtils.SUPERVISORS_ROOT)) {
issueCallback(supervisorsCallback);
} else if (root.equals(ClusterUtils.BLOBSTORE_ROOT)) {
issueCallback(blobstoreCallback);
} else if (root.equals(ClusterUtils.STORMS_ROOT) && size > 1) {
issueMapCallback(stormBaseCallback, toks.get(1));
} else if (root.equals(ClusterUtils.CREDENTIALS_ROOT) && size > 1) {
issueMapCallback(credentialsCallback, toks.get(1));
} else if (root.equals(ClusterUtils.LOGCONFIG_ROOT) && size > 1) {
issueMapCallback(logConfigCallback, toks.get(1));
} else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) {
issueMapCallback(backPressureCallback, toks.get(1));
} else if (root.equals(ClusterUtils.LEADERINFO_ROOT)) {
issueCallback(leaderInfoCallback);
} else {
LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path);
Runtime.getRuntime().exit(30);
}
}
return;
}
});
String[] pathlist = {
ClusterUtils.ASSIGNMENTS_SUBTREE,
ClusterUtils.STORMS_SUBTREE,
ClusterUtils.SUPERVISORS_SUBTREE,
ClusterUtils.WORKERBEATS_SUBTREE,
ClusterUtils.ERRORS_SUBTREE,
ClusterUtils.BLOBSTORE_SUBTREE,
ClusterUtils.NIMBUSES_SUBTREE,
ClusterUtils.LOGCONFIG_SUBTREE,
ClusterUtils.BACKPRESSURE_SUBTREE
};
for (String path : pathlist) {
this.stateStorage.mkdirs(path, defaultAcls);
}
}