in src/main/java/build/buildfarm/worker/shard/Worker.java [378:504]
public Worker(String session, ServerBuilder<?> serverBuilder, ShardWorkerConfig config)
throws ConfigurationException {
super("BuildFarmShardWorker");
this.config = config;
isCasShard = config.getCapabilities().getCas();
String identifier = "buildfarm-worker-" + config.getPublicName() + "-" + session;
root = getValidRoot(config);
if (config.getPublicName().isEmpty()) {
throw new ConfigurationException("worker's public name should not be empty");
}
digestUtil = new DigestUtil(getValidHashFunction(config));
ShardWorkerConfig.BackplaneCase backplaneCase = config.getBackplaneCase();
switch (backplaneCase) {
default:
case BACKPLANE_NOT_SET:
throw new IllegalArgumentException("Shard Backplane not set in config");
case REDIS_SHARD_BACKPLANE_CONFIG:
backplane =
new RedisShardBackplane(
config.getRedisShardBackplaneConfig(),
identifier,
this::stripOperation,
this::stripQueuedOperation);
break;
}
workerStubs =
WorkerStubs.create(digestUtil, config.getShardWorkerInstanceConfig().getGrpcTimeout());
ExecutorService removeDirectoryService =
newFixedThreadPool(
/* nThreads=*/ 32,
new ThreadFactoryBuilder().setNameFormat("remove-directory-pool-%d").build());
ExecutorService accessRecorder = newSingleThreadExecutor();
InputStreamFactory remoteInputStreamFactory =
new RemoteInputStreamFactory(
config.getPublicName(),
backplane,
new Random(),
workerStubs,
(worker, t, context) -> {});
ContentAddressableStorage storage =
createStorages(
remoteInputStreamFactory, removeDirectoryService, accessRecorder, config.getCasList());
execFileSystem =
createExecFileSystem(
remoteInputStreamFactory, removeDirectoryService, accessRecorder, storage);
instance = new ShardWorkerInstance(config.getPublicName(), digestUtil, backplane, storage);
// Create the appropriate writer for the context
CasWriter writer;
if (!isCasShard) {
writer = new RemoteCasWriter();
} else {
writer = new LocalCasWriter();
}
DequeueMatchSettings matchSettings = new DequeueMatchSettings();
matchSettings.acceptEverything = config.getDequeueMatchSettings().getAcceptEverything();
matchSettings.allowUnmatched = config.getDequeueMatchSettings().getAllowUnmatched();
ShardWorkerContext context =
new ShardWorkerContext(
config.getPublicName(),
matchSettings,
config.getDequeueMatchSettings().getPlatform(),
config.getOperationPollPeriod(),
backplane::pollOperation,
config.getInputFetchStageWidth(),
config.getExecuteStageWidth(),
config.getInputFetchDeadline(),
backplane,
execFileSystem,
new EmptyInputStreamFactory(
new FailoverInputStreamFactory(
execFileSystem.getStorage(), remoteInputStreamFactory)),
config.getExecutionPoliciesList(),
instance,
config.getDefaultActionTimeout(),
config.getMaximumActionTimeout(),
config.getLimitExecution(),
config.getLimitGlobalExecution(),
config.getOnlyMulticoreTests(),
config.getErrorOperationRemainingResources(),
writer);
PipelineStage completeStage =
new PutOperationStage((operation) -> context.deactivate(operation.getName()));
PipelineStage reportResultStage = new ReportResultStage(context, completeStage, completeStage);
PipelineStage executeActionStage =
new ExecuteActionStage(context, reportResultStage, completeStage);
PipelineStage inputFetchStage =
new InputFetchStage(context, executeActionStage, new PutOperationStage(context::requeue));
PipelineStage matchStage = new MatchStage(context, inputFetchStage, completeStage);
pipeline = new Pipeline();
// pipeline.add(errorStage, 0);
pipeline.add(matchStage, 4);
pipeline.add(inputFetchStage, 3);
pipeline.add(executeActionStage, 2);
pipeline.add(reportResultStage, 1);
server =
serverBuilder
.addService(healthStatusManager.getHealthService())
.addService(
new ContentAddressableStorageService(
instance, /* deadlineAfter=*/ 1, DAYS
/* requestLogLevel=*/ ))
.addService(new ByteStreamService(instance, /* writeDeadlineAfter=*/ 1, DAYS))
.addService(
new WorkerProfileService(
storage,
inputFetchStage,
executeActionStage,
context,
completeStage,
backplane))
.addService(new ShutDownWorkerGracefully(this, config))
.build();
logger.log(INFO, String.format("%s initialized", identifier));
}