public Worker()

in src/main/java/build/buildfarm/worker/shard/Worker.java [378:504]


  public Worker(String session, ServerBuilder<?> serverBuilder, ShardWorkerConfig config)
      throws ConfigurationException {
    super("BuildFarmShardWorker");
    this.config = config;
    isCasShard = config.getCapabilities().getCas();
    String identifier = "buildfarm-worker-" + config.getPublicName() + "-" + session;
    root = getValidRoot(config);
    if (config.getPublicName().isEmpty()) {
      throw new ConfigurationException("worker's public name should not be empty");
    }

    digestUtil = new DigestUtil(getValidHashFunction(config));

    ShardWorkerConfig.BackplaneCase backplaneCase = config.getBackplaneCase();
    switch (backplaneCase) {
      default:
      case BACKPLANE_NOT_SET:
        throw new IllegalArgumentException("Shard Backplane not set in config");
      case REDIS_SHARD_BACKPLANE_CONFIG:
        backplane =
            new RedisShardBackplane(
                config.getRedisShardBackplaneConfig(),
                identifier,
                this::stripOperation,
                this::stripQueuedOperation);
        break;
    }

    workerStubs =
        WorkerStubs.create(digestUtil, config.getShardWorkerInstanceConfig().getGrpcTimeout());

    ExecutorService removeDirectoryService =
        newFixedThreadPool(
            /* nThreads=*/ 32,
            new ThreadFactoryBuilder().setNameFormat("remove-directory-pool-%d").build());
    ExecutorService accessRecorder = newSingleThreadExecutor();

    InputStreamFactory remoteInputStreamFactory =
        new RemoteInputStreamFactory(
            config.getPublicName(),
            backplane,
            new Random(),
            workerStubs,
            (worker, t, context) -> {});
    ContentAddressableStorage storage =
        createStorages(
            remoteInputStreamFactory, removeDirectoryService, accessRecorder, config.getCasList());
    execFileSystem =
        createExecFileSystem(
            remoteInputStreamFactory, removeDirectoryService, accessRecorder, storage);

    instance = new ShardWorkerInstance(config.getPublicName(), digestUtil, backplane, storage);

    // Create the appropriate writer for the context
    CasWriter writer;
    if (!isCasShard) {
      writer = new RemoteCasWriter();
    } else {
      writer = new LocalCasWriter();
    }

    DequeueMatchSettings matchSettings = new DequeueMatchSettings();
    matchSettings.acceptEverything = config.getDequeueMatchSettings().getAcceptEverything();
    matchSettings.allowUnmatched = config.getDequeueMatchSettings().getAllowUnmatched();

    ShardWorkerContext context =
        new ShardWorkerContext(
            config.getPublicName(),
            matchSettings,
            config.getDequeueMatchSettings().getPlatform(),
            config.getOperationPollPeriod(),
            backplane::pollOperation,
            config.getInputFetchStageWidth(),
            config.getExecuteStageWidth(),
            config.getInputFetchDeadline(),
            backplane,
            execFileSystem,
            new EmptyInputStreamFactory(
                new FailoverInputStreamFactory(
                    execFileSystem.getStorage(), remoteInputStreamFactory)),
            config.getExecutionPoliciesList(),
            instance,
            config.getDefaultActionTimeout(),
            config.getMaximumActionTimeout(),
            config.getLimitExecution(),
            config.getLimitGlobalExecution(),
            config.getOnlyMulticoreTests(),
            config.getErrorOperationRemainingResources(),
            writer);

    PipelineStage completeStage =
        new PutOperationStage((operation) -> context.deactivate(operation.getName()));
    PipelineStage reportResultStage = new ReportResultStage(context, completeStage, completeStage);
    PipelineStage executeActionStage =
        new ExecuteActionStage(context, reportResultStage, completeStage);
    PipelineStage inputFetchStage =
        new InputFetchStage(context, executeActionStage, new PutOperationStage(context::requeue));
    PipelineStage matchStage = new MatchStage(context, inputFetchStage, completeStage);

    pipeline = new Pipeline();
    // pipeline.add(errorStage, 0);
    pipeline.add(matchStage, 4);
    pipeline.add(inputFetchStage, 3);
    pipeline.add(executeActionStage, 2);
    pipeline.add(reportResultStage, 1);

    server =
        serverBuilder
            .addService(healthStatusManager.getHealthService())
            .addService(
                new ContentAddressableStorageService(
                    instance, /* deadlineAfter=*/ 1, DAYS
                    /* requestLogLevel=*/ ))
            .addService(new ByteStreamService(instance, /* writeDeadlineAfter=*/ 1, DAYS))
            .addService(
                new WorkerProfileService(
                    storage,
                    inputFetchStage,
                    executeActionStage,
                    context,
                    completeStage,
                    backplane))
            .addService(new ShutDownWorkerGracefully(this, config))
            .build();

    logger.log(INFO, String.format("%s initialized", identifier));
  }