in src/main/java/build/buildfarm/instance/shard/ShardInstance.java [1536:1642]
private ListenableFuture<Void> validateAndRequeueOperation(
Operation operation, QueueEntry queueEntry, Duration timeout) {
ExecuteEntry executeEntry = queueEntry.getExecuteEntry();
String operationName = executeEntry.getOperationName();
checkState(operationName.equals(operation.getName()));
RequestMetadata requestMetadata = executeEntry.getRequestMetadata();
ListenableFuture<QueuedOperation> fetchQueuedOperationFuture =
expect(
queueEntry.getQueuedOperationDigest(),
QueuedOperation.parser(),
operationTransformService,
requestMetadata);
Digest actionDigest = executeEntry.getActionDigest();
ListenableFuture<QueuedOperation> queuedOperationFuture =
catchingAsync(
fetchQueuedOperationFuture,
Throwable.class,
(e) ->
buildQueuedOperation(
operation.getName(), actionDigest, operationTransformService, requestMetadata),
directExecutor());
PreconditionFailure.Builder preconditionFailure = PreconditionFailure.newBuilder();
ListenableFuture<QueuedOperation> validatedFuture =
transformAsync(
queuedOperationFuture,
(queuedOperation) -> {
/* sync, throws StatusException - must be serviced via non-OTS */
validateQueuedOperationAndInputs(
actionDigest, queuedOperation, preconditionFailure, requestMetadata);
return immediateFuture(queuedOperation);
},
operationTransformService);
// this little fork ensures that a successfully fetched QueuedOperation
// will not be reuploaded
ListenableFuture<QueuedOperationResult> uploadedFuture =
transformAsync(
validatedFuture,
(queuedOperation) ->
catchingAsync(
transform(
fetchQueuedOperationFuture,
(fechedQueuedOperation) -> {
QueuedOperationMetadata metadata =
QueuedOperationMetadata.newBuilder()
.setExecuteOperationMetadata(
executeOperationMetadata(
executeEntry, ExecutionStage.Value.QUEUED))
.setQueuedOperationDigest(queueEntry.getQueuedOperationDigest())
.setRequestMetadata(requestMetadata)
.build();
return new QueuedOperationResult(queueEntry, metadata);
},
operationTransformService),
Throwable.class,
(e) ->
uploadQueuedOperation(
queuedOperation, executeEntry, operationTransformService, timeout),
operationTransformService),
directExecutor());
SettableFuture<Void> requeuedFuture = SettableFuture.create();
addCallback(
uploadedFuture,
new FutureCallback<QueuedOperationResult>() {
@Override
public void onSuccess(QueuedOperationResult result) {
Operation queueOperation =
operation.toBuilder().setMetadata(Any.pack(result.metadata)).build();
try {
backplane.queue(result.entry, queueOperation);
requeuedFuture.set(null);
} catch (IOException e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable t) {
logger.log(Level.SEVERE, "failed to requeue: " + operationName, t);
com.google.rpc.Status status = StatusProto.fromThrowable(t);
if (status == null) {
logger.log(Level.SEVERE, "no rpc status from exception for " + operationName, t);
status = asExecutionStatus(t);
} else if (com.google.rpc.Code.forNumber(status.getCode())
== com.google.rpc.Code.DEADLINE_EXCEEDED) {
logger.log(
Level.WARNING,
"an rpc status was thrown with DEADLINE_EXCEEDED for "
+ operationName
+ ", discarding it",
t);
status =
com.google.rpc.Status.newBuilder()
.setCode(com.google.rpc.Code.UNAVAILABLE.getNumber())
.setMessage("SUPPRESSED DEADLINE_EXCEEDED: " + t.getMessage())
.build();
}
logFailedStatus(actionDigest, status);
SettableFuture<Void> errorFuture = SettableFuture.create();
errorOperationFuture(operation, requestMetadata, status, errorFuture);
errorFuture.addListener(() -> requeuedFuture.set(null), operationTransformService);
}
},
operationTransformService);
return requeuedFuture;
}