public ListenableFuture runFully()

in processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java [111:407]


  public <T> ListenableFuture<T> runFully(final FrameProcessor<T> processor, @Nullable final String cancellationId)
  {
    final List<ReadableFrameChannel> inputChannels = processor.inputChannels();
    final List<WritableFrameChannel> outputChannels = processor.outputChannels();
    final SettableFuture<T> finished = registerCancelableFuture(SettableFuture.create(), true, cancellationId);

    if (finished.isDone()) {
      // Possibly due to starting life out being canceled.
      return finished;
    }

    class ExecutorRunnable implements Runnable
    {
      private final AwaitAnyWidget awaitAnyWidget = new AwaitAnyWidget(inputChannels);

      @Override
      public void run()
      {
        try {
          final List<ListenableFuture<?>> allWritabilityFutures = gatherWritabilityFutures();
          final List<ListenableFuture<?>> writabilityFuturesToWaitFor =
              allWritabilityFutures.stream().filter(f -> !f.isDone()).collect(Collectors.toList());

          logProcessorStatusString(processor, finished, allWritabilityFutures);

          if (!writabilityFuturesToWaitFor.isEmpty()) {
            runProcessorAfterFutureResolves(Futures.allAsList(writabilityFuturesToWaitFor));
            return;
          }

          final Optional<ReturnOrAwait<T>> maybeResult = runProcessorNow();

          if (!maybeResult.isPresent()) {
            // Processor exited abnormally. Just exit; cleanup would have been handled elsewhere.
            return;
          }

          final ReturnOrAwait<T> result = maybeResult.get();
          logProcessorStatusString(processor, finished, null);

          if (result.isReturn()) {
            succeed(result.value());
          } else {
            // Don't retain a reference to this set: it may be mutated the next time the processor runs.
            final IntSet await = result.awaitSet();

            if (await.isEmpty()) {
              exec.execute(ExecutorRunnable.this);
            } else if (result.isAwaitAll() || await.size() == 1) {
              final List<ListenableFuture<?>> readabilityFutures = new ArrayList<>();

              for (final int channelNumber : await) {
                final ReadableFrameChannel channel = inputChannels.get(channelNumber);
                if (!channel.isFinished() && !channel.canRead()) {
                  readabilityFutures.add(channel.readabilityFuture());
                }
              }

              if (readabilityFutures.isEmpty()) {
                exec.execute(ExecutorRunnable.this);
              } else {
                runProcessorAfterFutureResolves(Futures.allAsList(readabilityFutures));
              }
            } else {
              // Await any.
              runProcessorAfterFutureResolves(awaitAnyWidget.awaitAny(await));
            }
          }
        }
        catch (Throwable e) {
          fail(e);
        }
      }

      private List<ListenableFuture<?>> gatherWritabilityFutures()
      {
        final List<ListenableFuture<?>> futures = new ArrayList<>();

        for (final WritableFrameChannel channel : outputChannels) {
          futures.add(channel.writabilityFuture());
        }

        return futures;
      }

      /**
       * Executes {@link FrameProcessor#runIncrementally} on the currently-readable inputs, while respecting
       * cancellation. Returns an empty Optional if the processor exited abnormally (canceled or failed). Returns a
       * present Optional if the processor ran successfully. Throws an exception if the processor does.
       */
      private Optional<ReturnOrAwait<T>> runProcessorNow()
      {
        final IntSet readableInputs = new IntOpenHashSet(inputChannels.size());

        for (int i = 0; i < inputChannels.size(); i++) {
          final ReadableFrameChannel channel = inputChannels.get(i);
          if (channel.isFinished() || channel.canRead()) {
            readableInputs.add(i);
          }
        }

        if (cancellationId != null) {
          // After this synchronized block, our thread may be interrupted by cancellations, because "cancel"
          // checks "runningProcessors".
          synchronized (lock) {
            if (cancelableProcessors.containsEntry(cancellationId, processor)) {
              runningProcessors.put(processor, Thread.currentThread());
            } else {
              // Processor has been canceled. We don't need to handle cleanup, because someone else did it.
              return Optional.empty();
            }
          }
        }

        final String threadName = Thread.currentThread().getName();
        boolean canceled = false;
        Either<Throwable, ReturnOrAwait<T>> retVal;

        try {
          if (Thread.interrupted()) {
            throw new InterruptedException();
          }

          if (cancellationId != null) {
            // Set the thread name to something involving the cancellationId, to make thread dumps more useful.
            Thread.currentThread().setName(threadName + "-" + cancellationId);
          }

          retVal = Either.value(processor.runIncrementally(readableInputs));
        }
        catch (Throwable e) {
          // Catch InterruptedException too: interrupt was meant for the processor, not us.
          retVal = Either.error(e);
        }
        finally {
          if (cancellationId != null) {
            // After this synchronized block, our thread will no longer be interrupted by cancellations,
            // because "cancel" checks "runningProcessors".
            synchronized (lock) {
              if (Thread.interrupted()) {
                // ignore: interrupt was meant for the processor, but came after the processor already exited.
              }

              runningProcessors.remove(processor);
              lock.notifyAll();

              if (!cancelableProcessors.containsEntry(cancellationId, processor)) {
                // Processor has been canceled by one of the "cancel" methods. They will handle cleanup.
                canceled = true;
              }
            }

            // Restore original thread name.
            Thread.currentThread().setName(threadName);
          }
        }

        if (canceled) {
          return Optional.empty();
        } else {
          return Optional.of(retVal.valueOrThrow());
        }
      }

      private <V> void runProcessorAfterFutureResolves(final ListenableFuture<V> future)
      {
        final ListenableFuture<V> cancelableFuture = registerCancelableFuture(future, false, cancellationId);

        Futures.addCallback(
            cancelableFuture,
            new FutureCallback<>()
            {
              @Override
              public void onSuccess(final V ignored)
              {
                try {
                  exec.execute(ExecutorRunnable.this);
                }
                catch (Throwable e) {
                  fail(e);
                }
              }

              @Override
              public void onFailure(Throwable t)
              {
                // Ignore cancellation.
                if (!cancelableFuture.isCancelled()) {
                  fail(t);
                }
              }
            },
            MoreExecutors.directExecutor()
        );
      }

      /**
       * Called when a processor succeeds.
       *
       * Runs the cleanup routine and sets the finished future to a particular value. If cleanup fails, sets the
       * finished future to an error.
       */
      private void succeed(T value)
      {
        try {
          doProcessorCleanup();
        }
        catch (Throwable e) {
          finished.setException(e);
          return;
        }

        finished.set(value);
      }

      /**
       * Called when a processor fails.
       *
       * Cancels output channels, runs the cleanup routine, and sets the finished future to an error.
       */
      private void fail(Throwable e)
      {
        for (final WritableFrameChannel outputChannel : outputChannels) {
          try {
            outputChannel.fail(e);
          }
          catch (Throwable e1) {
            e.addSuppressed(e1);
          }
        }

        try {
          doProcessorCleanup();
        }
        catch (Throwable e1) {
          e.addSuppressed(e1);
        }

        finished.setException(e);
      }

      /**
       * Called when a processor exits via {@link #succeed} or {@link #fail}. Not called when a processor
       * is canceled.
       */
      void doProcessorCleanup() throws IOException
      {
        final boolean doCleanup;

        if (cancellationId != null) {
          synchronized (lock) {
            // Skip cleanup if the processor is no longer in cancelableProcessors. This means one of the "cancel"
            // methods is going to do the cleanup.
            doCleanup = cancelableProcessors.remove(cancellationId, processor);
          }
        } else {
          doCleanup = true;
        }

        if (doCleanup) {
          processor.cleanup();
        }
      }
    }

    final ExecutorRunnable runnable = new ExecutorRunnable();

    finished.addListener(
        () -> {
          logProcessorStatusString(processor, finished, null);

          // If the future was canceled, and the processor is cancelable, then cancel the processor too.
          if (finished.isCancelled() && cancellationId != null) {
            boolean didRemoveFromCancelableProcessors;

            synchronized (lock) {
              didRemoveFromCancelableProcessors = cancelableProcessors.remove(cancellationId, processor);
            }

            if (didRemoveFromCancelableProcessors) {
              try {
                cancel(Collections.singleton(processor));
              }
              catch (InterruptedException e) {
                Thread.currentThread().interrupt();
              }
            }
          }
        },
        Execs.directExecutor()
    );

    logProcessorStatusString(processor, finished, null);
    registerCancelableProcessor(processor, cancellationId);
    exec.execute(runnable);
    return finished;
  }