private ListenableFuture validateAndRequeueOperation()

in src/main/java/build/buildfarm/instance/shard/ShardInstance.java [1536:1642]


  private ListenableFuture<Void> validateAndRequeueOperation(
      Operation operation, QueueEntry queueEntry, Duration timeout) {
    ExecuteEntry executeEntry = queueEntry.getExecuteEntry();
    String operationName = executeEntry.getOperationName();
    checkState(operationName.equals(operation.getName()));
    RequestMetadata requestMetadata = executeEntry.getRequestMetadata();
    ListenableFuture<QueuedOperation> fetchQueuedOperationFuture =
        expect(
            queueEntry.getQueuedOperationDigest(),
            QueuedOperation.parser(),
            operationTransformService,
            requestMetadata);
    Digest actionDigest = executeEntry.getActionDigest();
    ListenableFuture<QueuedOperation> queuedOperationFuture =
        catchingAsync(
            fetchQueuedOperationFuture,
            Throwable.class,
            (e) ->
                buildQueuedOperation(
                    operation.getName(), actionDigest, operationTransformService, requestMetadata),
            directExecutor());
    PreconditionFailure.Builder preconditionFailure = PreconditionFailure.newBuilder();
    ListenableFuture<QueuedOperation> validatedFuture =
        transformAsync(
            queuedOperationFuture,
            (queuedOperation) -> {
              /* sync, throws StatusException - must be serviced via non-OTS */
              validateQueuedOperationAndInputs(
                  actionDigest, queuedOperation, preconditionFailure, requestMetadata);
              return immediateFuture(queuedOperation);
            },
            operationTransformService);

    // this little fork ensures that a successfully fetched QueuedOperation
    // will not be reuploaded
    ListenableFuture<QueuedOperationResult> uploadedFuture =
        transformAsync(
            validatedFuture,
            (queuedOperation) ->
                catchingAsync(
                    transform(
                        fetchQueuedOperationFuture,
                        (fechedQueuedOperation) -> {
                          QueuedOperationMetadata metadata =
                              QueuedOperationMetadata.newBuilder()
                                  .setExecuteOperationMetadata(
                                      executeOperationMetadata(
                                          executeEntry, ExecutionStage.Value.QUEUED))
                                  .setQueuedOperationDigest(queueEntry.getQueuedOperationDigest())
                                  .setRequestMetadata(requestMetadata)
                                  .build();
                          return new QueuedOperationResult(queueEntry, metadata);
                        },
                        operationTransformService),
                    Throwable.class,
                    (e) ->
                        uploadQueuedOperation(
                            queuedOperation, executeEntry, operationTransformService, timeout),
                    operationTransformService),
            directExecutor());

    SettableFuture<Void> requeuedFuture = SettableFuture.create();
    addCallback(
        uploadedFuture,
        new FutureCallback<QueuedOperationResult>() {
          @Override
          public void onSuccess(QueuedOperationResult result) {
            Operation queueOperation =
                operation.toBuilder().setMetadata(Any.pack(result.metadata)).build();
            try {
              backplane.queue(result.entry, queueOperation);
              requeuedFuture.set(null);
            } catch (IOException e) {
              onFailure(e);
            }
          }

          @Override
          public void onFailure(Throwable t) {
            logger.log(Level.SEVERE, "failed to requeue: " + operationName, t);
            com.google.rpc.Status status = StatusProto.fromThrowable(t);
            if (status == null) {
              logger.log(Level.SEVERE, "no rpc status from exception for " + operationName, t);
              status = asExecutionStatus(t);
            } else if (com.google.rpc.Code.forNumber(status.getCode())
                == com.google.rpc.Code.DEADLINE_EXCEEDED) {
              logger.log(
                  Level.WARNING,
                  "an rpc status was thrown with DEADLINE_EXCEEDED for "
                      + operationName
                      + ", discarding it",
                  t);
              status =
                  com.google.rpc.Status.newBuilder()
                      .setCode(com.google.rpc.Code.UNAVAILABLE.getNumber())
                      .setMessage("SUPPRESSED DEADLINE_EXCEEDED: " + t.getMessage())
                      .build();
            }
            logFailedStatus(actionDigest, status);
            SettableFuture<Void> errorFuture = SettableFuture.create();
            errorOperationFuture(operation, requestMetadata, status, errorFuture);
            errorFuture.addListener(() -> requeuedFuture.set(null), operationTransformService);
          }
        },
        operationTransformService);
    return requeuedFuture;
  }