in src/main/java/build/buildfarm/instance/shard/ShardInstance.java [823:966]
public void getBlob(
Digest blobDigest,
long offset,
long count,
ServerCallStreamObserver<ByteString> blobObserver,
RequestMetadata requestMetadata) {
List<String> workersList;
Set<String> workerSet;
Set<String> locationSet;
try {
workerSet = backplane.getWorkers();
locationSet = backplane.getBlobLocationSet(blobDigest);
synchronized (workerSet) {
workersList = new ArrayList<>(Sets.intersection(locationSet, workerSet));
}
} catch (IOException e) {
blobObserver.onError(e);
return;
}
boolean emptyWorkerList = workersList.isEmpty();
final ListenableFuture<List<String>> populatedWorkerListFuture;
if (emptyWorkerList) {
logger.log(
Level.FINE,
format(
"worker list was initially empty for %s, attempting to correct",
DigestUtil.toString(blobDigest)));
populatedWorkerListFuture =
transform(
correctMissingBlob(
backplane,
workerSet,
locationSet,
this::workerStub,
blobDigest,
directExecutor(),
RequestMetadata.getDefaultInstance()),
(foundOnWorkers) -> {
logger.log(
Level.FINE,
format(
"worker list was corrected for %s to be %s",
DigestUtil.toString(blobDigest), foundOnWorkers.toString()));
Iterables.addAll(workersList, foundOnWorkers);
return workersList;
},
directExecutor());
} else {
populatedWorkerListFuture = immediateFuture(workersList);
}
Context ctx = Context.current();
ServerCallStreamObserver<ByteString> chunkObserver =
new UniformDelegateServerCallStreamObserver<ByteString>(blobObserver) {
boolean triedCheck = emptyWorkerList;
@Override
public void onNext(ByteString nextChunk) {
blobObserver.onNext(nextChunk);
}
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
if (status.getCode() == Code.NOT_FOUND && !triedCheck) {
triedCheck = true;
workersList.clear();
final ListenableFuture<List<String>> workersListFuture;
logger.log(
Level.FINE,
format(
"worker list was depleted for %s, attempting to correct",
DigestUtil.toString(blobDigest)));
workersListFuture =
transform(
correctMissingBlob(
backplane,
workerSet,
locationSet,
ShardInstance.this::workerStub,
blobDigest,
directExecutor(),
RequestMetadata.getDefaultInstance()),
(foundOnWorkers) -> {
logger.log(
Level.FINE,
format(
"worker list was corrected after depletion for %s to be %s",
DigestUtil.toString(blobDigest), foundOnWorkers.toString()));
Iterables.addAll(workersList, foundOnWorkers);
return workersList;
},
directExecutor());
final ServerCallStreamObserver<ByteString> checkedChunkObserver = this;
addCallback(
workersListFuture,
new WorkersCallback(rand) {
@Override
public void onQueue(Deque<String> workers) {
ctx.run(
() ->
fetchBlobFromWorker(
blobDigest,
workers,
offset,
count,
checkedChunkObserver,
requestMetadata));
}
@Override
public void onFailure(Throwable t) {
blobObserver.onError(t);
}
},
directExecutor());
} else {
blobObserver.onError(t);
}
}
@Override
public void onCompleted() {
blobObserver.onCompleted();
}
};
addCallback(
populatedWorkerListFuture,
new WorkersCallback(rand) {
@Override
public void onQueue(Deque<String> workers) {
ctx.run(
() ->
fetchBlobFromWorker(
blobDigest, workers, offset, count, chunkObserver, requestMetadata));
}
@Override
public void onFailure(Throwable t) {
blobObserver.onError(t);
}
},
directExecutor());
}