private Object loadWorker()

in storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java [211:342]


    private Object loadWorker(IStateStorage stateStorage, IStormClusterState stormClusterState,
                              Map<String, String> initCreds, Credentials initialCredentials)
        throws Exception {
        workerState =
            new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
                            topologyConf, stateStorage, stormClusterState,
                            autoCreds, metricRegistry, initialCredentials);
        this.heatbeatMeter = metricRegistry.meter("doHeartbeat-calls", workerState.getWorkerTopologyContext(),
                Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);

        // Heartbeat here so that worker process dies if this fails
        // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
        // that worker is running and moves on
        doHeartBeat();

        executorsAtom = new AtomicReference<>(null);

        // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
        // to the supervisor
        workerState.heartbeatTimer
            .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
                try {
                    doHeartBeat();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });

        Integer execHeartBeatFreqSecs = workerState.stormClusterState.isPacemakerStateStore()
            ? (Integer) conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS)
            : (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS);
        workerState.executorHeartbeatTimer
            .scheduleRecurring(0, execHeartBeatFreqSecs,
                               Worker.this::doExecutorHeartbeats);

        workerState.refreshConnections();

        workerState.activateWorkerWhenAllConnectionsReady();

        workerState.refreshStormActive(null);

        workerState.runWorkerStartHooks();

        List<Executor> execs = new ArrayList<>();
        for (List<Long> e : workerState.getLocalExecutors()) {
            if (ConfigUtils.isLocalMode(conf)) {
                Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
                execs.add(executor);
                for (int i = 0; i < executor.getTaskIds().size(); ++i) {
                    workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
                }
            } else {
                Executor executor = Executor.mkExecutor(workerState, e, initCreds);
                for (int i = 0; i < executor.getTaskIds().size(); ++i) {
                    workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
                }
                execs.add(executor);
            }
        }

        List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
        for (Executor executor : execs) {
            newExecutors.add(executor.execute());
        }
        executorsAtom.set(newExecutors);

        // This thread will send out messages destined for remote tasks (on other workers)
        // If there are no remote outbound tasks, don't start the thread.
        if (workerState.hasRemoteOutboundTasks()) {
            transferThread = workerState.makeTransferThread();
            transferThread.setName("Worker-Transfer");
        }

        establishLogSettingCallback();

        final int credCheckMaxAllowed = 10;
        final int[] credCheckErrCnt = new int[1]; // consecutive-error-count

        workerState.refreshCredentialsTimer.scheduleRecurring(0,
                                                              (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), () -> {
                try {
                    checkCredentialsChanged();
                    credCheckErrCnt[0] = 0;
                } catch (Exception ex) {
                    credCheckErrCnt[0]++;
                    if (credCheckErrCnt[0] <= credCheckMaxAllowed) {
                        LOG.warn("Ignoring {} of {} consecutive exceptions when checking for credential change",
                            credCheckErrCnt[0], credCheckMaxAllowed, ex);
                    } else {
                        LOG.error("Received {} consecutive exceptions, {} tolerated, when checking for credential change",
                            credCheckErrCnt[0], credCheckMaxAllowed, ex);
                        throw ex;
                    }
                }
            });

        workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
                (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10),
            () -> {
                try {
                    LOG.debug("Checking if blobs have updated");
                    updateBlobUpdates();
                } catch (IOException e) {
                    // IOException from reading the version files to be ignored
                    LOG.error(e.getStackTrace().toString());
                }
            }
        );

        // The jitter allows the clients to get the data at different times, and avoids thundering herd
        if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
            workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, Worker.this::doRefreshLoad);
        }

        workerState.refreshConnectionsTimer.scheduleRecurring(0,
                                                              (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
                                                              workerState::refreshConnections);

        workerState.resetLogLevelsTimer.scheduleRecurring(0,
                                                          (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS),
                                                          logConfigManager::resetLogLevels);

        workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
                                                         workerState::refreshStormActive);

        setupFlushTupleTimer(topologyConf, newExecutors);
        setupBackPressureCheckTimer(topologyConf);

        LOG.info("Worker has topology config {}", ConfigUtils.maskPasswords(topologyConf));
        LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
        return this;
    }