in src/main/java/build/buildfarm/instance/shard/ShardInstance.java [648:738]
private void findMissingBlobsOnWorker(
String requestId,
Iterable<Digest> blobDigests,
Deque<String> workers,
ImmutableList.Builder<FindMissingResponseEntry> responses,
int originalSize,
Executor executor,
SettableFuture<Iterable<Digest>> missingDigestsFuture,
RequestMetadata requestMetadata) {
String worker = workers.removeFirst();
ListenableFuture<Iterable<Digest>> workerMissingBlobsFuture =
workerStub(worker).findMissingBlobs(blobDigests, requestMetadata);
Stopwatch stopwatch = Stopwatch.createStarted();
addCallback(
workerMissingBlobsFuture,
new FutureCallback<Iterable<Digest>>() {
@Override
public void onSuccess(Iterable<Digest> missingDigests) {
if (Iterables.isEmpty(missingDigests) || workers.isEmpty()) {
missingDigestsFuture.set(missingDigests);
} else {
responses.add(
new FindMissingResponseEntry(
worker,
stopwatch.elapsed(MICROSECONDS),
null,
Iterables.size(missingDigests)));
findMissingBlobsOnWorker(
requestId,
missingDigests,
workers,
responses,
originalSize,
executor,
missingDigestsFuture,
requestMetadata);
}
}
@Override
public void onFailure(Throwable t) {
responses.add(
new FindMissingResponseEntry(
worker, stopwatch.elapsed(MICROSECONDS), t, Iterables.size(blobDigests)));
Status status = Status.fromThrowable(t);
if (status.getCode() == Code.UNAVAILABLE || status.getCode() == Code.UNIMPLEMENTED) {
removeMalfunctioningWorker(worker, t, "findMissingBlobs(" + requestId + ")");
} else if (status.getCode() == Code.DEADLINE_EXCEEDED) {
for (FindMissingResponseEntry response : responses.build()) {
logger.log(
response.exception == null ? Level.WARNING : Level.SEVERE,
format(
"DEADLINE_EXCEEDED: findMissingBlobs(%s) %s: %d remaining of %d %dus%s",
requestId,
response.worker,
response.stillMissingAfter,
originalSize,
response.elapsedMicros,
response.exception != null ? ": " + response.exception.toString() : ""));
}
missingDigestsFuture.setException(status.asException());
} else if (status.getCode() == Code.CANCELLED
|| Context.current().isCancelled()
|| !SHARD_IS_RETRIABLE.test(status)) {
// do nothing further if we're cancelled
missingDigestsFuture.setException(status.asException());
} else {
// why not, always
workers.addLast(worker);
}
if (!missingDigestsFuture.isDone()) {
if (workers.isEmpty()) {
missingDigestsFuture.set(blobDigests);
} else {
findMissingBlobsOnWorker(
requestId,
blobDigests,
workers,
responses,
originalSize,
executor,
missingDigestsFuture,
requestMetadata);
}
}
}
},
executor);
}