in processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java [111:407]
public <T> ListenableFuture<T> runFully(final FrameProcessor<T> processor, @Nullable final String cancellationId)
{
final List<ReadableFrameChannel> inputChannels = processor.inputChannels();
final List<WritableFrameChannel> outputChannels = processor.outputChannels();
final SettableFuture<T> finished = registerCancelableFuture(SettableFuture.create(), true, cancellationId);
if (finished.isDone()) {
// Possibly due to starting life out being canceled.
return finished;
}
class ExecutorRunnable implements Runnable
{
private final AwaitAnyWidget awaitAnyWidget = new AwaitAnyWidget(inputChannels);
@Override
public void run()
{
try {
final List<ListenableFuture<?>> allWritabilityFutures = gatherWritabilityFutures();
final List<ListenableFuture<?>> writabilityFuturesToWaitFor =
allWritabilityFutures.stream().filter(f -> !f.isDone()).collect(Collectors.toList());
logProcessorStatusString(processor, finished, allWritabilityFutures);
if (!writabilityFuturesToWaitFor.isEmpty()) {
runProcessorAfterFutureResolves(Futures.allAsList(writabilityFuturesToWaitFor));
return;
}
final Optional<ReturnOrAwait<T>> maybeResult = runProcessorNow();
if (!maybeResult.isPresent()) {
// Processor exited abnormally. Just exit; cleanup would have been handled elsewhere.
return;
}
final ReturnOrAwait<T> result = maybeResult.get();
logProcessorStatusString(processor, finished, null);
if (result.isReturn()) {
succeed(result.value());
} else {
// Don't retain a reference to this set: it may be mutated the next time the processor runs.
final IntSet await = result.awaitSet();
if (await.isEmpty()) {
exec.execute(ExecutorRunnable.this);
} else if (result.isAwaitAll() || await.size() == 1) {
final List<ListenableFuture<?>> readabilityFutures = new ArrayList<>();
for (final int channelNumber : await) {
final ReadableFrameChannel channel = inputChannels.get(channelNumber);
if (!channel.isFinished() && !channel.canRead()) {
readabilityFutures.add(channel.readabilityFuture());
}
}
if (readabilityFutures.isEmpty()) {
exec.execute(ExecutorRunnable.this);
} else {
runProcessorAfterFutureResolves(Futures.allAsList(readabilityFutures));
}
} else {
// Await any.
runProcessorAfterFutureResolves(awaitAnyWidget.awaitAny(await));
}
}
}
catch (Throwable e) {
fail(e);
}
}
private List<ListenableFuture<?>> gatherWritabilityFutures()
{
final List<ListenableFuture<?>> futures = new ArrayList<>();
for (final WritableFrameChannel channel : outputChannels) {
futures.add(channel.writabilityFuture());
}
return futures;
}
/**
* Executes {@link FrameProcessor#runIncrementally} on the currently-readable inputs, while respecting
* cancellation. Returns an empty Optional if the processor exited abnormally (canceled or failed). Returns a
* present Optional if the processor ran successfully. Throws an exception if the processor does.
*/
private Optional<ReturnOrAwait<T>> runProcessorNow()
{
final IntSet readableInputs = new IntOpenHashSet(inputChannels.size());
for (int i = 0; i < inputChannels.size(); i++) {
final ReadableFrameChannel channel = inputChannels.get(i);
if (channel.isFinished() || channel.canRead()) {
readableInputs.add(i);
}
}
if (cancellationId != null) {
// After this synchronized block, our thread may be interrupted by cancellations, because "cancel"
// checks "runningProcessors".
synchronized (lock) {
if (cancelableProcessors.containsEntry(cancellationId, processor)) {
runningProcessors.put(processor, Thread.currentThread());
} else {
// Processor has been canceled. We don't need to handle cleanup, because someone else did it.
return Optional.empty();
}
}
}
final String threadName = Thread.currentThread().getName();
boolean canceled = false;
Either<Throwable, ReturnOrAwait<T>> retVal;
try {
if (Thread.interrupted()) {
throw new InterruptedException();
}
if (cancellationId != null) {
// Set the thread name to something involving the cancellationId, to make thread dumps more useful.
Thread.currentThread().setName(threadName + "-" + cancellationId);
}
retVal = Either.value(processor.runIncrementally(readableInputs));
}
catch (Throwable e) {
// Catch InterruptedException too: interrupt was meant for the processor, not us.
retVal = Either.error(e);
}
finally {
if (cancellationId != null) {
// After this synchronized block, our thread will no longer be interrupted by cancellations,
// because "cancel" checks "runningProcessors".
synchronized (lock) {
if (Thread.interrupted()) {
// ignore: interrupt was meant for the processor, but came after the processor already exited.
}
runningProcessors.remove(processor);
lock.notifyAll();
if (!cancelableProcessors.containsEntry(cancellationId, processor)) {
// Processor has been canceled by one of the "cancel" methods. They will handle cleanup.
canceled = true;
}
}
// Restore original thread name.
Thread.currentThread().setName(threadName);
}
}
if (canceled) {
return Optional.empty();
} else {
return Optional.of(retVal.valueOrThrow());
}
}
private <V> void runProcessorAfterFutureResolves(final ListenableFuture<V> future)
{
final ListenableFuture<V> cancelableFuture = registerCancelableFuture(future, false, cancellationId);
Futures.addCallback(
cancelableFuture,
new FutureCallback<>()
{
@Override
public void onSuccess(final V ignored)
{
try {
exec.execute(ExecutorRunnable.this);
}
catch (Throwable e) {
fail(e);
}
}
@Override
public void onFailure(Throwable t)
{
// Ignore cancellation.
if (!cancelableFuture.isCancelled()) {
fail(t);
}
}
},
MoreExecutors.directExecutor()
);
}
/**
* Called when a processor succeeds.
*
* Runs the cleanup routine and sets the finished future to a particular value. If cleanup fails, sets the
* finished future to an error.
*/
private void succeed(T value)
{
try {
doProcessorCleanup();
}
catch (Throwable e) {
finished.setException(e);
return;
}
finished.set(value);
}
/**
* Called when a processor fails.
*
* Cancels output channels, runs the cleanup routine, and sets the finished future to an error.
*/
private void fail(Throwable e)
{
for (final WritableFrameChannel outputChannel : outputChannels) {
try {
outputChannel.fail(e);
}
catch (Throwable e1) {
e.addSuppressed(e1);
}
}
try {
doProcessorCleanup();
}
catch (Throwable e1) {
e.addSuppressed(e1);
}
finished.setException(e);
}
/**
* Called when a processor exits via {@link #succeed} or {@link #fail}. Not called when a processor
* is canceled.
*/
void doProcessorCleanup() throws IOException
{
final boolean doCleanup;
if (cancellationId != null) {
synchronized (lock) {
// Skip cleanup if the processor is no longer in cancelableProcessors. This means one of the "cancel"
// methods is going to do the cleanup.
doCleanup = cancelableProcessors.remove(cancellationId, processor);
}
} else {
doCleanup = true;
}
if (doCleanup) {
processor.cleanup();
}
}
}
final ExecutorRunnable runnable = new ExecutorRunnable();
finished.addListener(
() -> {
logProcessorStatusString(processor, finished, null);
// If the future was canceled, and the processor is cancelable, then cancel the processor too.
if (finished.isCancelled() && cancellationId != null) {
boolean didRemoveFromCancelableProcessors;
synchronized (lock) {
didRemoveFromCancelableProcessors = cancelableProcessors.remove(cancellationId, processor);
}
if (didRemoveFromCancelableProcessors) {
try {
cancel(Collections.singleton(processor));
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
},
Execs.directExecutor()
);
logProcessorStatusString(processor, finished, null);
registerCancelableProcessor(processor, cancellationId);
exec.execute(runnable);
return finished;
}