public ListenableFuture execute()

in src/main/java/build/buildfarm/instance/server/AbstractServerInstance.java [1283:1408]


  public ListenableFuture<Void> execute(
      Digest actionDigest,
      boolean skipCacheLookup,
      ExecutionPolicy executionPolicy,
      ResultsCachePolicy resultsCachePolicy,
      RequestMetadata requestMetadata,
      Watcher watcher)
      throws InterruptedException {
    try {
      validateActionDigest("execute", actionDigest, requestMetadata);
    } catch (StatusException e) {
      com.google.rpc.Status status = StatusProto.fromThrowable(e);
      if (status == null) {
        getLogger().log(Level.SEVERE, "no rpc status from exception", e);
        status =
            com.google.rpc.Status.newBuilder()
                .setCode(Status.fromThrowable(e).getCode().value())
                .build();
      }
      logFailedStatus(actionDigest, status);
      Operation operation =
          Operation.newBuilder()
              .setDone(true)
              .setMetadata(
                  Any.pack(
                      ExecuteOperationMetadata.newBuilder()
                          .setStage(ExecutionStage.Value.COMPLETED)
                          .build()))
              .setResponse(Any.pack(ExecuteResponse.newBuilder().setStatus(status).build()))
              .build();
      try {
        watcher.observe(operation);
      } catch (Throwable t) {
        return immediateFailedFuture(t);
      }
      return immediateFuture(null);
    }

    ActionKey actionKey = DigestUtil.asActionKey(actionDigest);
    Operation operation = createOperation(actionKey);

    getLogger().info("Operation " + operation.getName() + " was created");

    getLogger()
        .info(
            format(
                "%s::execute(%s): %s",
                getName(), DigestUtil.toString(actionDigest), operation.getName()));

    putOperation(operation);

    ListenableFuture<Void> watchFuture = watchOperation(operation.getName(), watcher);

    ExecuteOperationMetadata metadata = expectExecuteOperationMetadata(operation);

    Operation.Builder operationBuilder = operation.toBuilder();
    final ListenableFuture<ActionResult> actionResultFuture;
    final ExecuteOperationMetadata cacheCheckMetadata;
    if (skipCacheLookup) {
      actionResultFuture = immediateFuture(null);
      cacheCheckMetadata = metadata;
    } else {
      cacheCheckMetadata = metadata.toBuilder().setStage(ExecutionStage.Value.CACHE_CHECK).build();
      putOperation(operationBuilder.setMetadata(Any.pack(metadata)).build());
      actionResultFuture = getActionResult(actionKey, requestMetadata);
    }

    Futures.addCallback(
        actionResultFuture,
        new FutureCallback<ActionResult>() {
          @SuppressWarnings("ConstantConditions")
          void onCompleted(@Nullable ActionResult actionResult) {
            final ExecuteOperationMetadata nextMetadata;
            if (actionResult == null) {
              nextMetadata =
                  cacheCheckMetadata.toBuilder().setStage(ExecutionStage.Value.QUEUED).build();
            } else {
              nextMetadata =
                  cacheCheckMetadata.toBuilder().setStage(ExecutionStage.Value.COMPLETED).build();
              operationBuilder
                  .setDone(true)
                  .setResponse(
                      Any.pack(
                          ExecuteResponse.newBuilder()
                              .setResult(actionResult)
                              .setStatus(
                                  com.google.rpc.Status.newBuilder()
                                      .setCode(Code.OK.getNumber())
                                      .build())
                              .setCachedResult(true)
                              .build()));
            }

            Operation nextOperation = operationBuilder.setMetadata(Any.pack(nextMetadata)).build();
            /* TODO record file count/size for matching purposes? */

            try {
              if (!nextOperation.getDone()) {
                updateOperationWatchers(
                    nextOperation); // updates watchers initially for queued stage
              }
              putOperation(nextOperation);
            } catch (InterruptedException e) {
              // ignore
            }
          }

          @Override
          public void onSuccess(ActionResult actionResult) {
            onCompleted(actionResult);
          }

          @SuppressWarnings("NullableProblems")
          @Override
          public void onFailure(Throwable t) {
            logger.log(
                Level.WARNING,
                format("action cache check of %s failed", DigestUtil.toString(actionDigest)),
                t);
            onCompleted(null);
          }
        },
        directExecutor());

    return watchFuture;
  }