in src/main/java/build/buildfarm/instance/shard/RemoteInputStreamFactory.java [150:276]
public InputStream newInput(
Digest blobDigest,
long offset,
long deadlineAfter,
TimeUnit deadlineAfterUnits,
RequestMetadata requestMetadata)
throws IOException, InterruptedException {
Set<String> remoteWorkers;
Set<String> locationSet;
try {
Set<String> workers = backplane.getWorkers();
if (publicName == null) {
remoteWorkers = workers;
} else {
synchronized (workers) {
remoteWorkers = Sets.difference(workers, ImmutableSet.of(publicName)).immutableCopy();
}
}
locationSet =
Sets.newHashSet(Sets.intersection(backplane.getBlobLocationSet(blobDigest), workers));
} catch (IOException e) {
throw Status.fromThrowable(e).asRuntimeException();
}
if (publicName != null && locationSet.remove(publicName)) {
backplane.removeBlobLocation(blobDigest, publicName);
}
List<String> workersList = new ArrayList<>(locationSet);
boolean emptyWorkerList = workersList.isEmpty();
final ListenableFuture<List<String>> populatedWorkerListFuture;
if (emptyWorkerList) {
populatedWorkerListFuture =
transform(
correctMissingBlob(
backplane,
remoteWorkers,
locationSet,
this::workerStub,
blobDigest,
newDirectExecutorService(),
requestMetadata),
(foundOnWorkers) -> {
Iterables.addAll(workersList, foundOnWorkers);
return workersList;
},
directExecutor());
} else {
populatedWorkerListFuture = immediateFuture(workersList);
}
SettableFuture<InputStream> inputStreamFuture = SettableFuture.create();
addCallback(
populatedWorkerListFuture,
new WorkersCallback(rand) {
boolean triedCheck = emptyWorkerList;
@Override
public void onQueue(Deque<String> workers) {
Set<String> locationSet = Sets.newHashSet(workers);
boolean complete = false;
while (!complete && !workers.isEmpty()) {
try {
inputStreamFuture.set(
fetchBlobFromRemoteWorker(
blobDigest,
workers,
offset,
deadlineAfter,
deadlineAfterUnits,
requestMetadata));
complete = true;
} catch (IOException e) {
if (workers.isEmpty()) {
if (triedCheck) {
onFailure(e);
return;
}
triedCheck = true;
workersList.clear();
ListenableFuture<List<String>> checkedWorkerListFuture =
transform(
correctMissingBlob(
backplane,
remoteWorkers,
locationSet,
RemoteInputStreamFactory.this::workerStub,
blobDigest,
newDirectExecutorService(),
requestMetadata),
(foundOnWorkers) -> {
Iterables.addAll(workersList, foundOnWorkers);
return workersList;
},
directExecutor());
addCallback(checkedWorkerListFuture, this, directExecutor());
complete = true;
}
} catch (InterruptedException e) {
complete = true;
onFailure(e);
}
}
}
@SuppressWarnings("NullableProblems")
@Override
public void onFailure(Throwable t) {
Status status = Status.fromThrowable(t);
if (status.getCode() == Code.NOT_FOUND) {
inputStreamFuture.setException(
new NoSuchFileException(DigestUtil.toString(blobDigest)));
} else {
inputStreamFuture.setException(t);
}
}
},
directExecutor());
try {
return inputStreamFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
Throwables.throwIfUnchecked(cause);
Throwables.throwIfInstanceOf(cause, IOException.class);
Throwables.throwIfInstanceOf(cause, InterruptedException.class);
throw new UncheckedExecutionException(cause);
}
}