in src/main/java/build/buildfarm/instance/shard/ShardInstance.java [2061:2271]
private ListenableFuture<Void> transformAndQueue(
ExecuteEntry executeEntry,
Poller poller,
Operation operation,
Stopwatch stopwatch,
Duration timeout) {
long checkCacheUSecs = stopwatch.elapsed(MICROSECONDS);
ExecuteOperationMetadata metadata;
try {
metadata = operation.getMetadata().unpack(ExecuteOperationMetadata.class);
} catch (InvalidProtocolBufferException e) {
return immediateFailedFuture(e);
}
Digest actionDigest = metadata.getActionDigest();
SettableFuture<Void> queueFuture = SettableFuture.create();
logger.log(
Level.FINE,
format(
"ShardInstance(%s): queue(%s): fetching action %s",
getName(), operation.getName(), actionDigest.getHash()));
RequestMetadata requestMetadata = executeEntry.getRequestMetadata();
ListenableFuture<Action> actionFuture =
catchingAsync(
transformAsync(
expectAction(actionDigest, requestMetadata),
(action) -> {
if (action == null) {
throw Status.NOT_FOUND.asException();
} else if (action.getDoNotCache()) {
// invalidate our action cache result as well as watcher owner
readThroughActionCache.invalidate(DigestUtil.asActionKey(actionDigest));
backplane.putOperation(
operation.toBuilder().setMetadata(Any.pack(action)).build(),
metadata.getStage());
}
return immediateFuture(action);
},
operationTransformService),
StatusException.class,
(e) -> {
Status st = Status.fromThrowable(e);
if (st.getCode() == Code.NOT_FOUND) {
PreconditionFailure.Builder preconditionFailure = PreconditionFailure.newBuilder();
preconditionFailure
.addViolationsBuilder()
.setType(VIOLATION_TYPE_MISSING)
.setSubject("blobs/" + DigestUtil.toString(actionDigest))
.setDescription(MISSING_ACTION);
checkPreconditionFailure(actionDigest, preconditionFailure.build());
}
throw st.asRuntimeException();
},
operationTransformService);
QueuedOperation.Builder queuedOperationBuilder = QueuedOperation.newBuilder();
ListenableFuture<ProfiledQueuedOperationMetadata.Builder> queuedFuture =
transformAsync(
actionFuture,
(action) -> {
logger.log(
Level.FINE,
format(
"ShardInstance(%s): queue(%s): fetched action %s transforming queuedOperation",
getName(), operation.getName(), actionDigest.getHash()));
Stopwatch transformStopwatch = Stopwatch.createStarted();
return transform(
transformQueuedOperation(
operation.getName(),
action,
action.getCommandDigest(),
action.getInputRootDigest(),
queuedOperationBuilder,
operationTransformService,
requestMetadata),
(queuedOperation) ->
ProfiledQueuedOperationMetadata.newBuilder()
.setQueuedOperation(queuedOperation)
.setQueuedOperationMetadata(
buildQueuedOperationMetadata(
metadata, requestMetadata, queuedOperation))
.setTransformedIn(
Durations.fromMicros(transformStopwatch.elapsed(MICROSECONDS))),
operationTransformService);
},
operationTransformService);
ListenableFuture<ProfiledQueuedOperationMetadata.Builder> validatedFuture =
transformAsync(
queuedFuture,
(profiledQueuedMetadata) -> {
logger.log(
Level.FINE,
format(
"ShardInstance(%s): queue(%s): queuedOperation %s transformed, validating",
getName(),
operation.getName(),
DigestUtil.toString(
profiledQueuedMetadata
.getQueuedOperationMetadata()
.getQueuedOperationDigest())));
long startValidateUSecs = stopwatch.elapsed(MICROSECONDS);
/* sync, throws StatusException */
validateQueuedOperation(actionDigest, profiledQueuedMetadata.getQueuedOperation());
return immediateFuture(
profiledQueuedMetadata.setValidatedIn(
Durations.fromMicros(stopwatch.elapsed(MICROSECONDS) - startValidateUSecs)));
},
operationTransformService);
ListenableFuture<ProfiledQueuedOperationMetadata> queuedOperationCommittedFuture =
transformAsync(
validatedFuture,
(profiledQueuedMetadata) -> {
logger.log(
Level.FINE,
format(
"ShardInstance(%s): queue(%s): queuedOperation %s validated, uploading",
getName(),
operation.getName(),
DigestUtil.toString(
profiledQueuedMetadata
.getQueuedOperationMetadata()
.getQueuedOperationDigest())));
ByteString queuedOperationBlob =
profiledQueuedMetadata.getQueuedOperation().toByteString();
Digest queuedOperationDigest =
profiledQueuedMetadata.getQueuedOperationMetadata().getQueuedOperationDigest();
long startUploadUSecs = stopwatch.elapsed(MICROSECONDS);
return transform(
writeBlobFuture(
queuedOperationDigest, queuedOperationBlob, requestMetadata, timeout),
(committedSize) ->
profiledQueuedMetadata
.setUploadedIn(
Durations.fromMicros(
stopwatch.elapsed(MICROSECONDS) - startUploadUSecs))
.build(),
operationTransformService);
},
operationTransformService);
// onQueue call?
addCallback(
queuedOperationCommittedFuture,
new FutureCallback<ProfiledQueuedOperationMetadata>() {
@Override
public void onSuccess(ProfiledQueuedOperationMetadata profiledQueuedMetadata) {
QueuedOperationMetadata queuedOperationMetadata =
profiledQueuedMetadata.getQueuedOperationMetadata();
Operation queueOperation =
operation.toBuilder().setMetadata(Any.pack(queuedOperationMetadata)).build();
QueueEntry queueEntry =
QueueEntry.newBuilder()
.setExecuteEntry(executeEntry)
.setQueuedOperationDigest(queuedOperationMetadata.getQueuedOperationDigest())
.setPlatform(
profiledQueuedMetadata.getQueuedOperation().getCommand().getPlatform())
.build();
try {
ensureCanQueue(stopwatch);
long startQueueUSecs = stopwatch.elapsed(MICROSECONDS);
poller.pause();
backplane.queue(queueEntry, queueOperation);
long elapsedUSecs = stopwatch.elapsed(MICROSECONDS);
long queueUSecs = elapsedUSecs - startQueueUSecs;
logger.log(
Level.FINE,
format(
"ShardInstance(%s): queue(%s): %dus checkCache, %dus transform, %dus validate, %dus upload, %dus queue, %dus elapsed",
getName(),
queueOperation.getName(),
checkCacheUSecs,
Durations.toMicros(profiledQueuedMetadata.getTransformedIn()),
Durations.toMicros(profiledQueuedMetadata.getValidatedIn()),
Durations.toMicros(profiledQueuedMetadata.getUploadedIn()),
queueUSecs,
elapsedUSecs));
queueFuture.set(null);
} catch (IOException e) {
onFailure(e.getCause() == null ? e : e.getCause());
} catch (InterruptedException e) {
// ignore
}
}
@Override
public void onFailure(Throwable t) {
poller.pause();
com.google.rpc.Status status = StatusProto.fromThrowable(t);
if (status == null) {
logger.log(
Level.SEVERE, "no rpc status from exception for " + operation.getName(), t);
status = asExecutionStatus(t);
} else if (com.google.rpc.Code.forNumber(status.getCode())
== com.google.rpc.Code.DEADLINE_EXCEEDED) {
logger.log(
Level.WARNING,
"an rpc status was thrown with DEADLINE_EXCEEDED for "
+ operation.getName()
+ ", discarding it",
t);
status =
com.google.rpc.Status.newBuilder()
.setCode(com.google.rpc.Code.UNAVAILABLE.getNumber())
.setMessage("SUPPRESSED DEADLINE_EXCEEDED: " + t.getMessage())
.build();
}
logFailedStatus(actionDigest, status);
errorOperationFuture(operation, requestMetadata, status, queueFuture);
}
},
operationTransformService);
return queueFuture;
}