in storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java [159:249]
public WorkerState(Map<String, Object> conf,
IContext mqContext,
String topologyId,
String assignmentId,
Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier,
int port,
String workerId,
Map<String, Object> topologyConf,
IStateStorage stateStorage,
IStormClusterState stormClusterState,
Collection<IAutoCredentials> autoCredentials,
StormMetricRegistry metricRegistry,
Credentials initialCredentials) throws IOException,
InvalidTopologyException {
this.metricRegistry = metricRegistry;
this.autoCredentials = autoCredentials;
this.credentialsAtom = new AtomicReference(initialCredentials);
this.conf = conf;
this.supervisorIfaceSupplier = supervisorIfaceSupplier;
this.mqContext = (null != mqContext) ? mqContext :
TransportFactory.makeContext(topologyConf, metricRegistry);
this.topologyId = topologyId;
this.assignmentId = assignmentId;
this.port = port;
this.workerId = workerId;
this.stateStorage = stateStorage;
this.stormClusterState = stormClusterState;
this.localExecutors =
new HashSet<>(readWorkerExecutors(assignmentId, port, getLocalAssignment(this.stormClusterState, topologyId)));
this.isWorkerActive = new CountDownLatch(1);
this.isTopologyActive = new AtomicBoolean(false);
this.stormComponentToDebug = new AtomicReference<>();
this.topology = ConfigUtils.readSupervisorTopology(conf, topologyId, AdvancedFSOps.make(conf));
this.taskToComponent = StormCommon.stormTaskInfo(topology, topologyConf);
this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, localExecutors, taskToComponent);
this.localTaskIds = new ArrayList<>();
this.taskToExecutorQueue = new HashMap<>();
this.blobToLastKnownVersion = new ConcurrentHashMap<>();
for (Map.Entry<List<Long>, JCQueue> entry : executorReceiveQueueMap.entrySet()) {
List<Integer> taskIds = StormCommon.executorIdToTasks(entry.getKey());
for (Integer taskId : taskIds) {
this.taskToExecutorQueue.put(taskId, entry.getValue());
}
this.localTaskIds.addAll(taskIds);
}
Collections.sort(localTaskIds);
this.topologyConf = topologyConf;
this.systemTopology = StormCommon.systemTopology(topologyConf, topology);
this.componentToStreamToFields = new HashMap<>();
for (String c : ThriftTopologyUtils.getComponentIds(systemTopology)) {
Map<String, Fields> streamToFields = new HashMap<>();
for (Map.Entry<String, StreamInfo> stream :
ThriftTopologyUtils.getComponentCommon(systemTopology, c).get_streams().entrySet()) {
streamToFields.put(stream.getKey(), new Fields(stream.getValue().get_output_fields()));
}
componentToStreamToFields.put(c, streamToFields);
}
this.componentToSortedTasks = Utils.reverseMap(taskToComponent);
this.componentToSortedTasks.values().forEach(Collections::sort);
this.endpointSocketLock = new ReentrantReadWriteLock();
this.cachedNodeToPortSocket = new AtomicReference<>(new HashMap<>());
this.cachedTaskToNodePort = new AtomicReference<>(new HashMap<>());
this.cachedNodeToHost = new AtomicReference<>(new HashMap<>());
this.suicideCallback = Utils.mkSuicideFn();
this.uptime = Utils.makeUptimeComputer();
this.defaultSharedResources = makeDefaultResources();
this.userSharedResources = makeUserResources();
this.loadMapping = new LoadMapping();
this.assignmentVersions = new AtomicReference<>(new HashMap<>());
this.outboundTasks = workerOutboundTasks();
this.trySerializeLocal = topologyConf.containsKey(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE)
&& (Boolean) topologyConf.get(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
if (trySerializeLocal) {
LOG.warn("WILL TRY TO SERIALIZE ALL TUPLES (Turn off {} for production", Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
}
int maxTaskId = getMaxTaskId(componentToSortedTasks);
this.workerTransfer = new WorkerTransfer(this, topologyConf, maxTaskId);
this.bpTracker = new BackPressureTracker(workerId, taskToExecutorQueue, metricRegistry, taskToComponent);
this.deserializedWorkerHooks = deserializeWorkerHooks();
LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
IConnectionCallback cb = new DeserializingConnectionCallback(topologyConf,
getWorkerTopologyContext(),
this::transferLocalBatch);
Supplier<Object> newConnectionResponse = () -> {
BackPressureStatus bpStatus = bpTracker.getCurrStatus();
LOG.info("Sending BackPressure status to new client. BPStatus: {}", bpStatus);
return bpStatus;
};
this.receiver = this.mqContext.bind(topologyId, port, cb, newConnectionResponse);
}