public void getBlob()

in src/main/java/build/buildfarm/instance/shard/ShardInstance.java [823:966]


  public void getBlob(
      Digest blobDigest,
      long offset,
      long count,
      ServerCallStreamObserver<ByteString> blobObserver,
      RequestMetadata requestMetadata) {
    List<String> workersList;
    Set<String> workerSet;
    Set<String> locationSet;
    try {
      workerSet = backplane.getWorkers();
      locationSet = backplane.getBlobLocationSet(blobDigest);
      synchronized (workerSet) {
        workersList = new ArrayList<>(Sets.intersection(locationSet, workerSet));
      }
    } catch (IOException e) {
      blobObserver.onError(e);
      return;
    }
    boolean emptyWorkerList = workersList.isEmpty();
    final ListenableFuture<List<String>> populatedWorkerListFuture;
    if (emptyWorkerList) {
      logger.log(
          Level.FINE,
          format(
              "worker list was initially empty for %s, attempting to correct",
              DigestUtil.toString(blobDigest)));
      populatedWorkerListFuture =
          transform(
              correctMissingBlob(
                  backplane,
                  workerSet,
                  locationSet,
                  this::workerStub,
                  blobDigest,
                  directExecutor(),
                  RequestMetadata.getDefaultInstance()),
              (foundOnWorkers) -> {
                logger.log(
                    Level.FINE,
                    format(
                        "worker list was corrected for %s to be %s",
                        DigestUtil.toString(blobDigest), foundOnWorkers.toString()));
                Iterables.addAll(workersList, foundOnWorkers);
                return workersList;
              },
              directExecutor());
    } else {
      populatedWorkerListFuture = immediateFuture(workersList);
    }

    Context ctx = Context.current();
    ServerCallStreamObserver<ByteString> chunkObserver =
        new UniformDelegateServerCallStreamObserver<ByteString>(blobObserver) {
          boolean triedCheck = emptyWorkerList;

          @Override
          public void onNext(ByteString nextChunk) {
            blobObserver.onNext(nextChunk);
          }

          @Override
          public void onError(Throwable t) {
            Status status = Status.fromThrowable(t);
            if (status.getCode() == Code.NOT_FOUND && !triedCheck) {
              triedCheck = true;
              workersList.clear();
              final ListenableFuture<List<String>> workersListFuture;
              logger.log(
                  Level.FINE,
                  format(
                      "worker list was depleted for %s, attempting to correct",
                      DigestUtil.toString(blobDigest)));
              workersListFuture =
                  transform(
                      correctMissingBlob(
                          backplane,
                          workerSet,
                          locationSet,
                          ShardInstance.this::workerStub,
                          blobDigest,
                          directExecutor(),
                          RequestMetadata.getDefaultInstance()),
                      (foundOnWorkers) -> {
                        logger.log(
                            Level.FINE,
                            format(
                                "worker list was corrected after depletion for %s to be %s",
                                DigestUtil.toString(blobDigest), foundOnWorkers.toString()));
                        Iterables.addAll(workersList, foundOnWorkers);
                        return workersList;
                      },
                      directExecutor());
              final ServerCallStreamObserver<ByteString> checkedChunkObserver = this;
              addCallback(
                  workersListFuture,
                  new WorkersCallback(rand) {
                    @Override
                    public void onQueue(Deque<String> workers) {
                      ctx.run(
                          () ->
                              fetchBlobFromWorker(
                                  blobDigest,
                                  workers,
                                  offset,
                                  count,
                                  checkedChunkObserver,
                                  requestMetadata));
                    }

                    @Override
                    public void onFailure(Throwable t) {
                      blobObserver.onError(t);
                    }
                  },
                  directExecutor());
            } else {
              blobObserver.onError(t);
            }
          }

          @Override
          public void onCompleted() {
            blobObserver.onCompleted();
          }
        };
    addCallback(
        populatedWorkerListFuture,
        new WorkersCallback(rand) {
          @Override
          public void onQueue(Deque<String> workers) {
            ctx.run(
                () ->
                    fetchBlobFromWorker(
                        blobDigest, workers, offset, count, chunkObserver, requestMetadata));
          }

          @Override
          public void onFailure(Throwable t) {
            blobObserver.onError(t);
          }
        },
        directExecutor());
  }