public ShardInstance()

in src/main/java/build/buildfarm/instance/shard/ShardInstance.java [297:487]


  public ShardInstance(
      String name,
      DigestUtil digestUtil,
      Backplane backplane,
      ReadThroughActionCache readThroughActionCache,
      boolean runDispatchedMonitor,
      int dispatchedMonitorIntervalSeconds,
      boolean runOperationQueuer,
      long maxEntrySizeBytes,
      int maxCpu,
      int maxRequeueAttempts,
      Duration maxActionTimeout,
      boolean useDenyList,
      Runnable onStop,
      LoadingCache<String, Instance> workerStubs,
      ListeningExecutorService actionCacheFetchService,
      boolean ensureOutputsPresent) {
    super(
        name,
        digestUtil,
        /* contentAddressableStorage=*/ null,
        /* actionCache=*/ readThroughActionCache,
        /* outstandingOperations=*/ null,
        /* completedOperations=*/ null,
        /* activeBlobWrites=*/ null,
        ensureOutputsPresent);
    this.backplane = backplane;
    this.readThroughActionCache = readThroughActionCache;
    this.workerStubs = workerStubs;
    this.onStop = onStop;
    this.maxEntrySizeBytes = maxEntrySizeBytes;
    this.maxCpu = maxCpu;
    this.maxRequeueAttempts = maxRequeueAttempts;
    this.maxActionTimeout = maxActionTimeout;
    this.useDenyList = useDenyList;
    this.actionCacheFetchService = actionCacheFetchService;
    backplane.setOnUnsubscribe(this::stop);

    remoteInputStreamFactory =
        new RemoteInputStreamFactory(
            backplane, rand, workerStubs, this::removeMalfunctioningWorker);

    if (runDispatchedMonitor) {
      dispatchedMonitor =
          new Thread(
              new DispatchedMonitor(
                  backplane, this::requeueOperation, dispatchedMonitorIntervalSeconds));
    } else {
      dispatchedMonitor = null;
    }

    if (runOperationQueuer) {
      operationQueuer =
          new Thread(
              new Runnable() {
                final Stopwatch stopwatch = Stopwatch.createUnstarted();

                ListenableFuture<Void> iterate() throws IOException, InterruptedException {
                  ensureCanQueue(stopwatch); // wait for transition to canQueue state
                  long canQueueUSecs = stopwatch.elapsed(MICROSECONDS);
                  stopwatch.stop();
                  ExecuteEntry executeEntry = backplane.deprequeueOperation();
                  stopwatch.start();
                  if (executeEntry == null) {
                    logger.log(Level.SEVERE, "OperationQueuer: Got null from deprequeue...");
                    return immediateFuture(null);
                  }
                  // half the watcher expiry, need to expose this from backplane
                  Poller poller = new Poller(Durations.fromSeconds(5));
                  String operationName = executeEntry.getOperationName();
                  poller.resume(
                      () -> {
                        try {
                          backplane.queueing(executeEntry.getOperationName());
                        } catch (IOException e) {
                          if (!stopping && !stopped) {
                            logger.log(
                                Level.SEVERE,
                                format("error polling %s for queuing", operationName),
                                e);
                          }
                          // mostly ignore, we will be stopped at some point later
                        }
                        return !stopping && !stopped;
                      },
                      () -> {},
                      Deadline.after(5, MINUTES));
                  try {
                    logger.log(Level.FINE, "queueing " + operationName);
                    ListenableFuture<Void> queueFuture = queue(executeEntry, poller, queueTimeout);
                    addCallback(
                        queueFuture,
                        new FutureCallback<Void>() {
                          @Override
                          public void onSuccess(Void result) {
                            logger.log(Level.FINE, "successfully queued " + operationName);
                            // nothing
                          }

                          @Override
                          public void onFailure(Throwable t) {
                            logger.log(Level.SEVERE, "error queueing " + operationName, t);
                          }
                        },
                        operationTransformService);
                    long operationTransformDispatchUSecs =
                        stopwatch.elapsed(MICROSECONDS) - canQueueUSecs;
                    logger.log(
                        Level.FINE,
                        format(
                            "OperationQueuer: Dispatched To Transform %s: %dus in canQueue, %dus in transform dispatch",
                            operationName, canQueueUSecs, operationTransformDispatchUSecs));
                    return queueFuture;
                  } catch (Throwable t) {
                    poller.pause();
                    logger.log(Level.SEVERE, "error queueing " + operationName, t);
                    return immediateFuture(null);
                  }
                }

                @Override
                public void run() {
                  logger.log(Level.FINE, "OperationQueuer: Running");
                  try {
                    for (; ; ) {
                      transformTokensQueue.put(new Object());
                      stopwatch.start();
                      try {
                        iterate()
                            .addListener(
                                () -> {
                                  try {
                                    transformTokensQueue.take();
                                  } catch (InterruptedException e) {
                                    logger.log(
                                        Level.SEVERE,
                                        "interrupted while returning transform token",
                                        e);
                                  }
                                },
                                operationTransformService);
                      } catch (IOException e) {
                        transformTokensQueue.take();
                        // problems interacting with backplane
                      } finally {
                        stopwatch.reset();
                      }
                    }
                  } catch (InterruptedException e) {
                    // treat with exit
                    operationQueuer = null;
                    return;
                  } catch (Exception t) {
                    logger.log(
                        Level.SEVERE, "OperationQueuer: fatal exception during iteration", t);
                  } finally {
                    logger.log(Level.FINE, "OperationQueuer: Exiting");
                  }
                  operationQueuer = null;
                  try {
                    stop();
                  } catch (InterruptedException e) {
                    logger.log(Level.SEVERE, "interrupted while stopping instance " + getName(), e);
                  }
                }
              });
    } else {
      operationQueuer = null;
    }

    prometheusMetricsThread =
        new Thread(
            () -> {
              while (!Thread.currentThread().isInterrupted()) {
                try {
                  TimeUnit.SECONDS.sleep(30);
                  BackplaneStatus backplaneStatus = backplaneStatus();
                  workerPoolSize.set(backplaneStatus.getActiveWorkersCount());
                  dispatchedOperationsSize.set(backplaneStatus.getDispatchedSize());
                  preQueueSize.set(backplaneStatus.getPrequeue().getSize());
                  updateQueueSizes(backplaneStatus.getOperationQueue().getProvisionsList());
                } catch (InterruptedException e) {
                  Thread.currentThread().interrupt();
                  break;
                } catch (Exception e) {
                  logger.log(Level.SEVERE, "Could not update RedisShardBackplane metrics", e);
                }
              }
            },
            "Prometheus Metrics Collector");
  }