in src/main/java/build/buildfarm/instance/server/AbstractServerInstance.java [1283:1408]
public ListenableFuture<Void> execute(
Digest actionDigest,
boolean skipCacheLookup,
ExecutionPolicy executionPolicy,
ResultsCachePolicy resultsCachePolicy,
RequestMetadata requestMetadata,
Watcher watcher)
throws InterruptedException {
try {
validateActionDigest("execute", actionDigest, requestMetadata);
} catch (StatusException e) {
com.google.rpc.Status status = StatusProto.fromThrowable(e);
if (status == null) {
getLogger().log(Level.SEVERE, "no rpc status from exception", e);
status =
com.google.rpc.Status.newBuilder()
.setCode(Status.fromThrowable(e).getCode().value())
.build();
}
logFailedStatus(actionDigest, status);
Operation operation =
Operation.newBuilder()
.setDone(true)
.setMetadata(
Any.pack(
ExecuteOperationMetadata.newBuilder()
.setStage(ExecutionStage.Value.COMPLETED)
.build()))
.setResponse(Any.pack(ExecuteResponse.newBuilder().setStatus(status).build()))
.build();
try {
watcher.observe(operation);
} catch (Throwable t) {
return immediateFailedFuture(t);
}
return immediateFuture(null);
}
ActionKey actionKey = DigestUtil.asActionKey(actionDigest);
Operation operation = createOperation(actionKey);
getLogger().info("Operation " + operation.getName() + " was created");
getLogger()
.info(
format(
"%s::execute(%s): %s",
getName(), DigestUtil.toString(actionDigest), operation.getName()));
putOperation(operation);
ListenableFuture<Void> watchFuture = watchOperation(operation.getName(), watcher);
ExecuteOperationMetadata metadata = expectExecuteOperationMetadata(operation);
Operation.Builder operationBuilder = operation.toBuilder();
final ListenableFuture<ActionResult> actionResultFuture;
final ExecuteOperationMetadata cacheCheckMetadata;
if (skipCacheLookup) {
actionResultFuture = immediateFuture(null);
cacheCheckMetadata = metadata;
} else {
cacheCheckMetadata = metadata.toBuilder().setStage(ExecutionStage.Value.CACHE_CHECK).build();
putOperation(operationBuilder.setMetadata(Any.pack(metadata)).build());
actionResultFuture = getActionResult(actionKey, requestMetadata);
}
Futures.addCallback(
actionResultFuture,
new FutureCallback<ActionResult>() {
@SuppressWarnings("ConstantConditions")
void onCompleted(@Nullable ActionResult actionResult) {
final ExecuteOperationMetadata nextMetadata;
if (actionResult == null) {
nextMetadata =
cacheCheckMetadata.toBuilder().setStage(ExecutionStage.Value.QUEUED).build();
} else {
nextMetadata =
cacheCheckMetadata.toBuilder().setStage(ExecutionStage.Value.COMPLETED).build();
operationBuilder
.setDone(true)
.setResponse(
Any.pack(
ExecuteResponse.newBuilder()
.setResult(actionResult)
.setStatus(
com.google.rpc.Status.newBuilder()
.setCode(Code.OK.getNumber())
.build())
.setCachedResult(true)
.build()));
}
Operation nextOperation = operationBuilder.setMetadata(Any.pack(nextMetadata)).build();
/* TODO record file count/size for matching purposes? */
try {
if (!nextOperation.getDone()) {
updateOperationWatchers(
nextOperation); // updates watchers initially for queued stage
}
putOperation(nextOperation);
} catch (InterruptedException e) {
// ignore
}
}
@Override
public void onSuccess(ActionResult actionResult) {
onCompleted(actionResult);
}
@SuppressWarnings("NullableProblems")
@Override
public void onFailure(Throwable t) {
logger.log(
Level.WARNING,
format("action cache check of %s failed", DigestUtil.toString(actionDigest)),
t);
onCompleted(null);
}
},
directExecutor());
return watchFuture;
}