public WorkerState()

in storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java [159:249]


    public WorkerState(Map<String, Object> conf,
            IContext mqContext,
            String topologyId,
            String assignmentId,
            Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier,
            int port,
            String workerId,
            Map<String, Object> topologyConf,
            IStateStorage stateStorage,
            IStormClusterState stormClusterState,
            Collection<IAutoCredentials> autoCredentials,
            StormMetricRegistry metricRegistry,
            Credentials initialCredentials) throws IOException,
            InvalidTopologyException {
        this.metricRegistry = metricRegistry;
        this.autoCredentials = autoCredentials;
        this.credentialsAtom = new AtomicReference(initialCredentials);
        this.conf = conf;
        this.supervisorIfaceSupplier = supervisorIfaceSupplier;
        this.mqContext = (null != mqContext) ? mqContext :
                TransportFactory.makeContext(topologyConf, metricRegistry);
        this.topologyId = topologyId;
        this.assignmentId = assignmentId;
        this.port = port;
        this.workerId = workerId;
        this.stateStorage = stateStorage;
        this.stormClusterState = stormClusterState;
        this.localExecutors =
            new HashSet<>(readWorkerExecutors(assignmentId, port, getLocalAssignment(this.stormClusterState, topologyId)));
        this.isWorkerActive = new CountDownLatch(1);
        this.isTopologyActive = new AtomicBoolean(false);
        this.stormComponentToDebug = new AtomicReference<>();
        this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf));
        this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf);
        this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, localExecutors, taskToComponent);
        this.localTaskIds = new ArrayList<>();
        this.taskToExecutorQueue = new HashMap<>();
        this.blobToLastKnownVersion = new ConcurrentHashMap<>();
        for (Map.Entry<List<Long>, JCQueue> entry : executorReceiveQueueMap.entrySet()) {
            List<Integer> taskIds = StormCommon.executorIdToTasks(entry.getKey());
            for (Integer taskId : taskIds) {
                this.taskToExecutorQueue.put(taskId, entry.getValue());
            }
            this.localTaskIds.addAll(taskIds);
        }
        Collections.sort(localTaskIds);
        this.topologyConf = topologyConf;
        this.systemTopology = StormCommon.systemTopology(topologyConf, topology);
        this.componentToStreamToFields = new HashMap<>();
        for (String c : ThriftTopologyUtils.getComponentIds(systemTopology)) {
            Map<String, Fields> streamToFields = new HashMap<>();
            for (Map.Entry<String, StreamInfo> stream :
                ThriftTopologyUtils.getComponentCommon(systemTopology, c).get_streams().entrySet()) {
                streamToFields.put(stream.getKey(), new Fields(stream.getValue().get_output_fields()));
            }
            componentToStreamToFields.put(c, streamToFields);
        }
        this.componentToSortedTasks = Utils.reverseMap(taskToComponent);
        this.componentToSortedTasks.values().forEach(Collections::sort);
        this.endpointSocketLock = new ReentrantReadWriteLock();
        this.cachedNodeToPortSocket = new AtomicReference<>(new HashMap<>());
        this.cachedTaskToNodePort = new AtomicReference<>(new HashMap<>());
        this.cachedNodeToHost = new AtomicReference<>(new HashMap<>());
        this.suicideCallback = Utils.mkSuicideFn();
        this.uptime = Utils.makeUptimeComputer();
        this.defaultSharedResources = makeDefaultResources();
        this.userSharedResources = makeUserResources();
        this.loadMapping = new LoadMapping();
        this.assignmentVersions = new AtomicReference<>(new HashMap<>());
        this.outboundTasks = workerOutboundTasks();
        this.trySerializeLocal = topologyConf.containsKey(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE)
                                 && (Boolean) topologyConf.get(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
        if (trySerializeLocal) {
            LOG.warn("WILL TRY TO SERIALIZE ALL TUPLES (Turn off {} for production", Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
        }
        int maxTaskId = getMaxTaskId(componentToSortedTasks);
        this.workerTransfer = new WorkerTransfer(this, topologyConf, maxTaskId);

        this.bpTracker = new BackPressureTracker(workerId, taskToExecutorQueue, metricRegistry, taskToComponent);
        this.deserializedWorkerHooks = deserializeWorkerHooks();
        LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
        IConnectionCallback cb = new DeserializingConnectionCallback(topologyConf,
            getWorkerTopologyContext(),
            this::transferLocalBatch);
        Supplier<Object> newConnectionResponse = () -> {
            BackPressureStatus bpStatus = bpTracker.getCurrStatus();
            LOG.info("Sending BackPressure status to new client. BPStatus: {}", bpStatus);
            return bpStatus;
        };
        this.receiver = this.mqContext.bind(topologyId, port, cb, newConnectionResponse);
    }