in storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java [211:342]
private Object loadWorker(IStateStorage stateStorage, IStormClusterState stormClusterState,
Map<String, String> initCreds, Credentials initialCredentials)
throws Exception {
workerState =
new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
topologyConf, stateStorage, stormClusterState,
autoCreds, metricRegistry, initialCredentials);
this.heatbeatMeter = metricRegistry.meter("doHeartbeat-calls", workerState.getWorkerTopologyContext(),
Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
// Heartbeat here so that worker process dies if this fails
// it's important that worker heartbeat to supervisor ASAP so that supervisor knows
// that worker is running and moves on
doHeartBeat();
executorsAtom = new AtomicReference<>(null);
// launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
// to the supervisor
workerState.heartbeatTimer
.scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
try {
doHeartBeat();
} catch (IOException e) {
throw new RuntimeException(e);
}
});
Integer execHeartBeatFreqSecs = workerState.stormClusterState.isPacemakerStateStore()
? (Integer) conf.get(Config.TASK_HEARTBEAT_FREQUENCY_SECS)
: (Integer) conf.get(Config.EXECUTOR_METRICS_FREQUENCY_SECS);
workerState.executorHeartbeatTimer
.scheduleRecurring(0, execHeartBeatFreqSecs,
Worker.this::doExecutorHeartbeats);
workerState.refreshConnections();
workerState.activateWorkerWhenAllConnectionsReady();
workerState.refreshStormActive(null);
workerState.runWorkerStartHooks();
List<Executor> execs = new ArrayList<>();
for (List<Long> e : workerState.getLocalExecutors()) {
if (ConfigUtils.isLocalMode(conf)) {
Executor executor = LocalExecutor.mkExecutor(workerState, e, initCreds);
execs.add(executor);
for (int i = 0; i < executor.getTaskIds().size(); ++i) {
workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
}
} else {
Executor executor = Executor.mkExecutor(workerState, e, initCreds);
for (int i = 0; i < executor.getTaskIds().size(); ++i) {
workerState.localReceiveQueues.put(executor.getTaskIds().get(i), executor.getReceiveQueue());
}
execs.add(executor);
}
}
List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
for (Executor executor : execs) {
newExecutors.add(executor.execute());
}
executorsAtom.set(newExecutors);
// This thread will send out messages destined for remote tasks (on other workers)
// If there are no remote outbound tasks, don't start the thread.
if (workerState.hasRemoteOutboundTasks()) {
transferThread = workerState.makeTransferThread();
transferThread.setName("Worker-Transfer");
}
establishLogSettingCallback();
final int credCheckMaxAllowed = 10;
final int[] credCheckErrCnt = new int[1]; // consecutive-error-count
workerState.refreshCredentialsTimer.scheduleRecurring(0,
(Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), () -> {
try {
checkCredentialsChanged();
credCheckErrCnt[0] = 0;
} catch (Exception ex) {
credCheckErrCnt[0]++;
if (credCheckErrCnt[0] <= credCheckMaxAllowed) {
LOG.warn("Ignoring {} of {} consecutive exceptions when checking for credential change",
credCheckErrCnt[0], credCheckMaxAllowed, ex);
} else {
LOG.error("Received {} consecutive exceptions, {} tolerated, when checking for credential change",
credCheckErrCnt[0], credCheckMaxAllowed, ex);
throw ex;
}
}
});
workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
(Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10),
() -> {
try {
LOG.debug("Checking if blobs have updated");
updateBlobUpdates();
} catch (IOException e) {
// IOException from reading the version files to be ignored
LOG.error(e.getStackTrace().toString());
}
}
);
// The jitter allows the clients to get the data at different times, and avoids thundering herd
if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, Worker.this::doRefreshLoad);
}
workerState.refreshConnectionsTimer.scheduleRecurring(0,
(Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
workerState::refreshConnections);
workerState.resetLogLevelsTimer.scheduleRecurring(0,
(Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS),
logConfigManager::resetLogLevels);
workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
workerState::refreshStormActive);
setupFlushTupleTimer(topologyConf, newExecutors);
setupBackPressureCheckTimer(topologyConf);
LOG.info("Worker has topology config {}", ConfigUtils.maskPasswords(topologyConf));
LOG.info("Worker {} for storm {} on {}:{} has finished loading", workerId, topologyId, assignmentId, port);
return this;
}