in runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java [188:398]
private StreamingDataflowWorker(
WindmillServerStub windmillServer,
long clientId,
ComputationConfig.Fetcher configFetcher,
ComputationStateCache computationStateCache,
WindmillStateCache windmillStateCache,
BoundedQueueExecutor workUnitExecutor,
DataflowMapTaskExecutorFactory mapTaskExecutorFactory,
DataflowWorkerHarnessOptions options,
HotKeyLogger hotKeyLogger,
Supplier<Instant> clock,
StreamingWorkerStatusReporterFactory streamingWorkerStatusReporterFactory,
FailureTracker failureTracker,
WorkFailureProcessor workFailureProcessor,
StreamingCounters streamingCounters,
MemoryMonitor memoryMonitor,
GrpcWindmillStreamFactory windmillStreamFactory,
ScheduledExecutorService activeWorkRefreshExecutorFn,
ConcurrentMap<String, StageInfo> stageInfoMap,
@Nullable GrpcDispatcherClient dispatcherClient) {
// Register standard file systems.
FileSystems.setDefaultPipelineOptions(options);
this.configFetcher = configFetcher;
this.computationStateCache = computationStateCache;
this.stateCache = windmillStateCache;
this.readerCache =
new ReaderCache(
Duration.standardSeconds(options.getReaderCacheTimeoutSec()),
Executors.newCachedThreadPool());
this.options = options;
this.workUnitExecutor = workUnitExecutor;
this.memoryMonitor = BackgroundMemoryMonitor.create(memoryMonitor);
this.numCommitThreads =
options.isEnableStreamingEngine()
? Math.max(options.getWindmillServiceCommitThreads(), 1)
: 1;
StreamingWorkScheduler streamingWorkScheduler =
StreamingWorkScheduler.create(
options,
clock,
readerCache,
mapTaskExecutorFactory,
workUnitExecutor,
stateCache::forComputation,
failureTracker,
workFailureProcessor,
streamingCounters,
hotKeyLogger,
sampler,
ID_GENERATOR,
configFetcher.getGlobalConfigHandle(),
stageInfoMap);
ThrottlingGetDataMetricTracker getDataMetricTracker =
new ThrottlingGetDataMetricTracker(memoryMonitor);
// Status page members. Different implementations on whether the harness is streaming engine
// direct path, streaming engine cloud path, or streaming appliance.
@Nullable ChannelzServlet channelzServlet = null;
Consumer<PrintWriter> getDataStatusProvider;
Supplier<Long> currentActiveCommitBytesProvider;
ChannelCache channelCache = null;
if (options.isEnableStreamingEngine() && options.getIsWindmillServiceDirectPathEnabled()) {
// Direct path pipelines.
WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore();
channelCache = createChannelCache(options, configFetcher);
FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
FanOutStreamingEngineWorkerHarness.create(
createJobHeader(options, clientId),
GetWorkBudget.builder()
.setItems(chooseMaxBundlesOutstanding(options))
.setBytes(MAX_GET_WORK_FETCH_BYTES)
.build(),
windmillStreamFactory,
(workItem,
serializedWorkItemSize,
watermarks,
processingContext,
getWorkStreamLatencies) ->
computationStateCache
.get(processingContext.computationId())
.ifPresent(
computationState -> {
memoryMonitor.waitForResources("GetWork");
streamingWorkScheduler.scheduleWork(
computationState,
workItem,
serializedWorkItemSize,
watermarks,
processingContext,
getWorkStreamLatencies);
}),
ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache),
GetWorkBudgetDistributors.distributeEvenly(),
Preconditions.checkNotNull(dispatcherClient),
commitWorkStream ->
StreamingEngineWorkCommitter.builder()
// Share the commitByteSemaphore across all created workCommitters.
.setCommitByteSemaphore(maxCommitByteSemaphore)
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
.setOnCommitComplete(this::onCompleteCommit)
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
.setCommitWorkStreamFactory(
() -> CloseableStream.create(commitWorkStream, () -> {}))
.build(),
getDataMetricTracker);
getDataStatusProvider = getDataMetricTracker::printHtml;
currentActiveCommitBytesProvider =
fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
channelzServlet =
createChannelzServlet(
options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness;
} else {
// Non-direct path pipelines.
Windmill.GetWorkRequest request =
Windmill.GetWorkRequest.newBuilder()
.setClientId(clientId)
.setMaxItems(chooseMaxBundlesOutstanding(options))
.setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
.build();
GetDataClient getDataClient;
HeartbeatSender heartbeatSender;
WorkCommitter workCommitter;
GetWorkSender getWorkSender;
if (options.isEnableStreamingEngine()) {
WindmillStreamPool<GetDataStream> getDataStreamPool =
WindmillStreamPool.create(
Math.max(1, options.getWindmillGetDataStreamCount()),
GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream);
getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
heartbeatSender =
createStreamingEngineHeartbeatSender(
options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle());
channelzServlet =
createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints);
workCommitter =
StreamingEngineWorkCommitter.builder()
.setCommitWorkStreamFactory(
WindmillStreamPool.create(
numCommitThreads,
COMMIT_STREAM_TIMEOUT,
windmillServer::commitWorkStream)
::getCloseableStream)
.setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
.setNumCommitSenders(numCommitThreads)
.setOnCommitComplete(this::onCompleteCommit)
.build();
getWorkSender =
GetWorkSender.forStreamingEngine(
receiver -> windmillServer.getWorkStream(request, receiver));
} else {
getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker);
heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData);
workCommitter =
StreamingApplianceWorkCommitter.create(
windmillServer::commitWork, this::onCompleteCommit);
getWorkSender = GetWorkSender.forAppliance(() -> windmillServer.getWork(request));
}
getDataStatusProvider = getDataClient::printHtml;
currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes;
this.streamingWorkerHarness =
SingleSourceWorkerHarness.builder()
.setStreamingWorkScheduler(streamingWorkScheduler)
.setWorkCommitter(workCommitter)
.setGetDataClient(getDataClient)
.setComputationStateFetcher(this.computationStateCache::get)
.setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
.setHeartbeatSender(heartbeatSender)
.setThrottledTimeTracker(windmillServer::getAndResetThrottleTime)
.setGetWorkSender(getWorkSender)
.build();
}
this.workerStatusReporter =
streamingWorkerStatusReporterFactory.createStatusReporter(streamingWorkerHarness);
this.activeWorkRefresher =
new ActiveWorkRefresher(
clock,
options.getActiveWorkRefreshPeriodMillis(),
options.isEnableStreamingEngine()
? Math.max(options.getStuckCommitDurationMillis(), 0)
: 0,
computationStateCache::getAllPresentComputations,
sampler,
activeWorkRefreshExecutorFn,
getDataMetricTracker::trackHeartbeats);
this.statusPages =
createStatusPageBuilder(options, windmillStreamFactory, memoryMonitor)
.setClock(clock)
.setClientId(clientId)
.setIsRunning(running)
.setStateCache(stateCache)
.setComputationStateCache(this.computationStateCache)
.setWorkUnitExecutor(workUnitExecutor)
.setGlobalConfigHandle(configFetcher.getGlobalConfigHandle())
.setChannelzServlet(channelzServlet)
.setGetDataStatusProvider(getDataStatusProvider)
.setCurrentActiveCommitBytes(currentActiveCommitBytesProvider)
.setChannelCache(channelCache)
.build();
LOG.debug("isDirectPathEnabled: {}", options.getIsWindmillServiceDirectPathEnabled());
LOG.debug("windmillServiceEnabled: {}", options.isEnableStreamingEngine());
LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint());
LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
}