in src/main/java/build/buildfarm/instance/shard/ShardInstance.java [297:487]
public ShardInstance(
String name,
DigestUtil digestUtil,
Backplane backplane,
ReadThroughActionCache readThroughActionCache,
boolean runDispatchedMonitor,
int dispatchedMonitorIntervalSeconds,
boolean runOperationQueuer,
long maxEntrySizeBytes,
int maxCpu,
int maxRequeueAttempts,
Duration maxActionTimeout,
boolean useDenyList,
Runnable onStop,
LoadingCache<String, Instance> workerStubs,
ListeningExecutorService actionCacheFetchService,
boolean ensureOutputsPresent) {
super(
name,
digestUtil,
/* contentAddressableStorage=*/ null,
/* actionCache=*/ readThroughActionCache,
/* outstandingOperations=*/ null,
/* completedOperations=*/ null,
/* activeBlobWrites=*/ null,
ensureOutputsPresent);
this.backplane = backplane;
this.readThroughActionCache = readThroughActionCache;
this.workerStubs = workerStubs;
this.onStop = onStop;
this.maxEntrySizeBytes = maxEntrySizeBytes;
this.maxCpu = maxCpu;
this.maxRequeueAttempts = maxRequeueAttempts;
this.maxActionTimeout = maxActionTimeout;
this.useDenyList = useDenyList;
this.actionCacheFetchService = actionCacheFetchService;
backplane.setOnUnsubscribe(this::stop);
remoteInputStreamFactory =
new RemoteInputStreamFactory(
backplane, rand, workerStubs, this::removeMalfunctioningWorker);
if (runDispatchedMonitor) {
dispatchedMonitor =
new Thread(
new DispatchedMonitor(
backplane, this::requeueOperation, dispatchedMonitorIntervalSeconds));
} else {
dispatchedMonitor = null;
}
if (runOperationQueuer) {
operationQueuer =
new Thread(
new Runnable() {
final Stopwatch stopwatch = Stopwatch.createUnstarted();
ListenableFuture<Void> iterate() throws IOException, InterruptedException {
ensureCanQueue(stopwatch); // wait for transition to canQueue state
long canQueueUSecs = stopwatch.elapsed(MICROSECONDS);
stopwatch.stop();
ExecuteEntry executeEntry = backplane.deprequeueOperation();
stopwatch.start();
if (executeEntry == null) {
logger.log(Level.SEVERE, "OperationQueuer: Got null from deprequeue...");
return immediateFuture(null);
}
// half the watcher expiry, need to expose this from backplane
Poller poller = new Poller(Durations.fromSeconds(5));
String operationName = executeEntry.getOperationName();
poller.resume(
() -> {
try {
backplane.queueing(executeEntry.getOperationName());
} catch (IOException e) {
if (!stopping && !stopped) {
logger.log(
Level.SEVERE,
format("error polling %s for queuing", operationName),
e);
}
// mostly ignore, we will be stopped at some point later
}
return !stopping && !stopped;
},
() -> {},
Deadline.after(5, MINUTES));
try {
logger.log(Level.FINE, "queueing " + operationName);
ListenableFuture<Void> queueFuture = queue(executeEntry, poller, queueTimeout);
addCallback(
queueFuture,
new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
logger.log(Level.FINE, "successfully queued " + operationName);
// nothing
}
@Override
public void onFailure(Throwable t) {
logger.log(Level.SEVERE, "error queueing " + operationName, t);
}
},
operationTransformService);
long operationTransformDispatchUSecs =
stopwatch.elapsed(MICROSECONDS) - canQueueUSecs;
logger.log(
Level.FINE,
format(
"OperationQueuer: Dispatched To Transform %s: %dus in canQueue, %dus in transform dispatch",
operationName, canQueueUSecs, operationTransformDispatchUSecs));
return queueFuture;
} catch (Throwable t) {
poller.pause();
logger.log(Level.SEVERE, "error queueing " + operationName, t);
return immediateFuture(null);
}
}
@Override
public void run() {
logger.log(Level.FINE, "OperationQueuer: Running");
try {
for (; ; ) {
transformTokensQueue.put(new Object());
stopwatch.start();
try {
iterate()
.addListener(
() -> {
try {
transformTokensQueue.take();
} catch (InterruptedException e) {
logger.log(
Level.SEVERE,
"interrupted while returning transform token",
e);
}
},
operationTransformService);
} catch (IOException e) {
transformTokensQueue.take();
// problems interacting with backplane
} finally {
stopwatch.reset();
}
}
} catch (InterruptedException e) {
// treat with exit
operationQueuer = null;
return;
} catch (Exception t) {
logger.log(
Level.SEVERE, "OperationQueuer: fatal exception during iteration", t);
} finally {
logger.log(Level.FINE, "OperationQueuer: Exiting");
}
operationQueuer = null;
try {
stop();
} catch (InterruptedException e) {
logger.log(Level.SEVERE, "interrupted while stopping instance " + getName(), e);
}
}
});
} else {
operationQueuer = null;
}
prometheusMetricsThread =
new Thread(
() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
TimeUnit.SECONDS.sleep(30);
BackplaneStatus backplaneStatus = backplaneStatus();
workerPoolSize.set(backplaneStatus.getActiveWorkersCount());
dispatchedOperationsSize.set(backplaneStatus.getDispatchedSize());
preQueueSize.set(backplaneStatus.getPrequeue().getSize());
updateQueueSizes(backplaneStatus.getOperationQueue().getProvisionsList());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
logger.log(Level.SEVERE, "Could not update RedisShardBackplane metrics", e);
}
}
},
"Prometheus Metrics Collector");
}