public StormClusterStateImpl()

in storm-client/src/jvm/org/apache/storm/cluster/StormClusterStateImpl.java [76:155]


    public StormClusterStateImpl(IStateStorage stateStorage, ILocalAssignmentsBackend assignmentsassignmentsBackend,
                                 ClusterStateContext context, boolean shouldCloseStateStorageOnDisconnect) throws Exception {

        this.stateStorage = stateStorage;
        this.shouldCloseStateStorageOnDisconnect = shouldCloseStateStorageOnDisconnect;
        this.defaultAcls = context.getDefaultZkAcls();
        this.context = context;
        this.assignmentsBackend = assignmentsassignmentsBackend;
        assignmentInfoCallback = new ConcurrentHashMap<>();
        assignmentInfoWithVersionCallback = new ConcurrentHashMap<>();
        assignmentVersionCallback = new ConcurrentHashMap<>();
        supervisorsCallback = new AtomicReference<>();
        backPressureCallback = new ConcurrentHashMap<>();
        leaderInfoCallback = new AtomicReference<>();
        assignmentsCallback = new AtomicReference<>();
        stormBaseCallback = new ConcurrentHashMap<>();
        credentialsCallback = new ConcurrentHashMap<>();
        logConfigCallback = new ConcurrentHashMap<>();
        blobstoreCallback = new AtomicReference<>();

        stateId = this.stateStorage.register(new ZKStateChangedCallback() {

            @Override
            public void changed(Watcher.Event.EventType type, String path) {
                List<String> toks = tokenizePath(path);
                int size = toks.size();
                if (size >= 1) {
                    String root = toks.get(0);
                    if (root.equals(ClusterUtils.ASSIGNMENTS_ROOT)) {
                        if (size == 1) {
                            // set null and get the old value
                            issueCallback(assignmentsCallback);
                        } else {
                            issueMapCallback(assignmentInfoCallback, toks.get(1));
                            issueMapCallback(assignmentVersionCallback, toks.get(1));
                            issueMapCallback(assignmentInfoWithVersionCallback, toks.get(1));
                        }

                    } else if (root.equals(ClusterUtils.SUPERVISORS_ROOT)) {
                        issueCallback(supervisorsCallback);
                    } else if (root.equals(ClusterUtils.BLOBSTORE_ROOT)) {
                        issueCallback(blobstoreCallback);
                    } else if (root.equals(ClusterUtils.STORMS_ROOT) && size > 1) {
                        issueMapCallback(stormBaseCallback, toks.get(1));
                    } else if (root.equals(ClusterUtils.CREDENTIALS_ROOT) && size > 1) {
                        issueMapCallback(credentialsCallback, toks.get(1));
                    } else if (root.equals(ClusterUtils.LOGCONFIG_ROOT) && size > 1) {
                        issueMapCallback(logConfigCallback, toks.get(1));
                    } else if (root.equals(ClusterUtils.BACKPRESSURE_ROOT) && size > 1) {
                        issueMapCallback(backPressureCallback, toks.get(1));
                    } else if (root.equals(ClusterUtils.LEADERINFO_ROOT)) {
                        issueCallback(leaderInfoCallback);
                    } else {
                        LOG.error("{} Unknown callback for subtree {}", new RuntimeException("Unknown callback for this path"), path);
                        Runtime.getRuntime().exit(30);
                    }

                }

                return;
            }

        });

        String[] pathlist = {
            ClusterUtils.ASSIGNMENTS_SUBTREE,
            ClusterUtils.STORMS_SUBTREE,
            ClusterUtils.SUPERVISORS_SUBTREE,
            ClusterUtils.WORKERBEATS_SUBTREE,
            ClusterUtils.ERRORS_SUBTREE,
            ClusterUtils.BLOBSTORE_SUBTREE,
            ClusterUtils.NIMBUSES_SUBTREE,
            ClusterUtils.LOGCONFIG_SUBTREE,
            ClusterUtils.BACKPRESSURE_SUBTREE
        };
        for (String path : pathlist) {
            this.stateStorage.mkdirs(path, defaultAcls);
        }

    }