private StreamingDataflowWorker()

in runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java [188:398]


  private StreamingDataflowWorker(
      WindmillServerStub windmillServer,
      long clientId,
      ComputationConfig.Fetcher configFetcher,
      ComputationStateCache computationStateCache,
      WindmillStateCache windmillStateCache,
      BoundedQueueExecutor workUnitExecutor,
      DataflowMapTaskExecutorFactory mapTaskExecutorFactory,
      DataflowWorkerHarnessOptions options,
      HotKeyLogger hotKeyLogger,
      Supplier<Instant> clock,
      StreamingWorkerStatusReporterFactory streamingWorkerStatusReporterFactory,
      FailureTracker failureTracker,
      WorkFailureProcessor workFailureProcessor,
      StreamingCounters streamingCounters,
      MemoryMonitor memoryMonitor,
      GrpcWindmillStreamFactory windmillStreamFactory,
      ScheduledExecutorService activeWorkRefreshExecutorFn,
      ConcurrentMap<String, StageInfo> stageInfoMap,
      @Nullable GrpcDispatcherClient dispatcherClient) {
    // Register standard file systems.
    FileSystems.setDefaultPipelineOptions(options);
    this.configFetcher = configFetcher;
    this.computationStateCache = computationStateCache;
    this.stateCache = windmillStateCache;
    this.readerCache =
        new ReaderCache(
            Duration.standardSeconds(options.getReaderCacheTimeoutSec()),
            Executors.newCachedThreadPool());
    this.options = options;
    this.workUnitExecutor = workUnitExecutor;
    this.memoryMonitor = BackgroundMemoryMonitor.create(memoryMonitor);
    this.numCommitThreads =
        options.isEnableStreamingEngine()
            ? Math.max(options.getWindmillServiceCommitThreads(), 1)
            : 1;

    StreamingWorkScheduler streamingWorkScheduler =
        StreamingWorkScheduler.create(
            options,
            clock,
            readerCache,
            mapTaskExecutorFactory,
            workUnitExecutor,
            stateCache::forComputation,
            failureTracker,
            workFailureProcessor,
            streamingCounters,
            hotKeyLogger,
            sampler,
            ID_GENERATOR,
            configFetcher.getGlobalConfigHandle(),
            stageInfoMap);
    ThrottlingGetDataMetricTracker getDataMetricTracker =
        new ThrottlingGetDataMetricTracker(memoryMonitor);
    // Status page members. Different implementations on whether the harness is streaming engine
    // direct path, streaming engine cloud path, or streaming appliance.
    @Nullable ChannelzServlet channelzServlet = null;
    Consumer<PrintWriter> getDataStatusProvider;
    Supplier<Long> currentActiveCommitBytesProvider;
    ChannelCache channelCache = null;
    if (options.isEnableStreamingEngine() && options.getIsWindmillServiceDirectPathEnabled()) {
      // Direct path pipelines.
      WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore();
      channelCache = createChannelCache(options, configFetcher);
      FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
          FanOutStreamingEngineWorkerHarness.create(
              createJobHeader(options, clientId),
              GetWorkBudget.builder()
                  .setItems(chooseMaxBundlesOutstanding(options))
                  .setBytes(MAX_GET_WORK_FETCH_BYTES)
                  .build(),
              windmillStreamFactory,
              (workItem,
                  serializedWorkItemSize,
                  watermarks,
                  processingContext,
                  getWorkStreamLatencies) ->
                  computationStateCache
                      .get(processingContext.computationId())
                      .ifPresent(
                          computationState -> {
                            memoryMonitor.waitForResources("GetWork");
                            streamingWorkScheduler.scheduleWork(
                                computationState,
                                workItem,
                                serializedWorkItemSize,
                                watermarks,
                                processingContext,
                                getWorkStreamLatencies);
                          }),
              ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache),
              GetWorkBudgetDistributors.distributeEvenly(),
              Preconditions.checkNotNull(dispatcherClient),
              commitWorkStream ->
                  StreamingEngineWorkCommitter.builder()
                      // Share the commitByteSemaphore across all created workCommitters.
                      .setCommitByteSemaphore(maxCommitByteSemaphore)
                      .setBackendWorkerToken(commitWorkStream.backendWorkerToken())
                      .setOnCommitComplete(this::onCompleteCommit)
                      .setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
                      .setCommitWorkStreamFactory(
                          () -> CloseableStream.create(commitWorkStream, () -> {}))
                      .build(),
              getDataMetricTracker);
      getDataStatusProvider = getDataMetricTracker::printHtml;
      currentActiveCommitBytesProvider =
          fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
      channelzServlet =
          createChannelzServlet(
              options, fanOutStreamingEngineWorkerHarness::currentWindmillEndpoints);
      this.streamingWorkerHarness = fanOutStreamingEngineWorkerHarness;
    } else {
      // Non-direct path pipelines.
      Windmill.GetWorkRequest request =
          Windmill.GetWorkRequest.newBuilder()
              .setClientId(clientId)
              .setMaxItems(chooseMaxBundlesOutstanding(options))
              .setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
              .build();
      GetDataClient getDataClient;
      HeartbeatSender heartbeatSender;
      WorkCommitter workCommitter;
      GetWorkSender getWorkSender;
      if (options.isEnableStreamingEngine()) {
        WindmillStreamPool<GetDataStream> getDataStreamPool =
            WindmillStreamPool.create(
                Math.max(1, options.getWindmillGetDataStreamCount()),
                GET_DATA_STREAM_TIMEOUT,
                windmillServer::getDataStream);
        getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
        heartbeatSender =
            createStreamingEngineHeartbeatSender(
                options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle());
        channelzServlet =
            createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints);
        workCommitter =
            StreamingEngineWorkCommitter.builder()
                .setCommitWorkStreamFactory(
                    WindmillStreamPool.create(
                            numCommitThreads,
                            COMMIT_STREAM_TIMEOUT,
                            windmillServer::commitWorkStream)
                        ::getCloseableStream)
                .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
                .setNumCommitSenders(numCommitThreads)
                .setOnCommitComplete(this::onCompleteCommit)
                .build();
        getWorkSender =
            GetWorkSender.forStreamingEngine(
                receiver -> windmillServer.getWorkStream(request, receiver));
      } else {
        getDataClient = new ApplianceGetDataClient(windmillServer, getDataMetricTracker);
        heartbeatSender = new ApplianceHeartbeatSender(windmillServer::getData);
        workCommitter =
            StreamingApplianceWorkCommitter.create(
                windmillServer::commitWork, this::onCompleteCommit);
        getWorkSender = GetWorkSender.forAppliance(() -> windmillServer.getWork(request));
      }

      getDataStatusProvider = getDataClient::printHtml;
      currentActiveCommitBytesProvider = workCommitter::currentActiveCommitBytes;

      this.streamingWorkerHarness =
          SingleSourceWorkerHarness.builder()
              .setStreamingWorkScheduler(streamingWorkScheduler)
              .setWorkCommitter(workCommitter)
              .setGetDataClient(getDataClient)
              .setComputationStateFetcher(this.computationStateCache::get)
              .setWaitForResources(() -> memoryMonitor.waitForResources("GetWork"))
              .setHeartbeatSender(heartbeatSender)
              .setThrottledTimeTracker(windmillServer::getAndResetThrottleTime)
              .setGetWorkSender(getWorkSender)
              .build();
    }

    this.workerStatusReporter =
        streamingWorkerStatusReporterFactory.createStatusReporter(streamingWorkerHarness);
    this.activeWorkRefresher =
        new ActiveWorkRefresher(
            clock,
            options.getActiveWorkRefreshPeriodMillis(),
            options.isEnableStreamingEngine()
                ? Math.max(options.getStuckCommitDurationMillis(), 0)
                : 0,
            computationStateCache::getAllPresentComputations,
            sampler,
            activeWorkRefreshExecutorFn,
            getDataMetricTracker::trackHeartbeats);

    this.statusPages =
        createStatusPageBuilder(options, windmillStreamFactory, memoryMonitor)
            .setClock(clock)
            .setClientId(clientId)
            .setIsRunning(running)
            .setStateCache(stateCache)
            .setComputationStateCache(this.computationStateCache)
            .setWorkUnitExecutor(workUnitExecutor)
            .setGlobalConfigHandle(configFetcher.getGlobalConfigHandle())
            .setChannelzServlet(channelzServlet)
            .setGetDataStatusProvider(getDataStatusProvider)
            .setCurrentActiveCommitBytes(currentActiveCommitBytesProvider)
            .setChannelCache(channelCache)
            .build();

    LOG.debug("isDirectPathEnabled: {}", options.getIsWindmillServiceDirectPathEnabled());
    LOG.debug("windmillServiceEnabled: {}", options.isEnableStreamingEngine());
    LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint());
    LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort());
    LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
  }