in core/src/main/java/org/apache/iceberg/util/Tasks.java [294:400]
private <E extends Exception> boolean runParallel(
final Task<I, E> task, Class<E> exceptionClass) throws E {
final Queue<I> succeeded = new ConcurrentLinkedQueue<>();
final Queue<Throwable> exceptions = new ConcurrentLinkedQueue<>();
final AtomicBoolean taskFailed = new AtomicBoolean(false);
final AtomicBoolean abortFailed = new AtomicBoolean(false);
final AtomicBoolean revertFailed = new AtomicBoolean(false);
List<Future<?>> futures = Lists.newArrayList();
for (final I item : items) {
// submit a task for each item that will either run or abort the task
futures.add(
service.submit(
new Runnable() {
@Override
public void run() {
if (!(stopOnFailure && taskFailed.get())) {
// run the task with retries
boolean threw = true;
try {
runTaskWithRetry(task, item);
succeeded.add(item);
threw = false;
} catch (Exception e) {
taskFailed.set(true);
exceptions.add(e);
if (onFailure != null) {
tryRunOnFailure(item, e);
}
} finally {
if (threw) {
taskFailed.set(true);
}
}
} else if (abortTask != null) {
// abort the task instead of running it
if (stopAbortsOnFailure && abortFailed.get()) {
return;
}
boolean failed = true;
try {
abortTask.run(item);
failed = false;
} catch (Exception e) {
LOG.error("Failed to abort task", e);
// swallow the exception
} finally {
if (failed) {
abortFailed.set(true);
}
}
}
}
}));
}
// let the above tasks complete (or abort)
exceptions.addAll(waitFor(futures));
futures.clear();
if (taskFailed.get() && revertTask != null) {
// at least one task failed, revert any that succeeded
for (final I item : succeeded) {
futures.add(
service.submit(
new Runnable() {
@Override
public void run() {
if (stopRevertsOnFailure && revertFailed.get()) {
return;
}
boolean failed = true;
try {
revertTask.run(item);
failed = false;
} catch (Exception e) {
LOG.error("Failed to revert task", e);
// swallow the exception
} finally {
if (failed) {
revertFailed.set(true);
}
}
}
}));
}
// let the revert tasks complete
exceptions.addAll(waitFor(futures));
}
if (throwFailureWhenFinished && !exceptions.isEmpty()) {
Tasks.throwOne(exceptions, exceptionClass);
} else if (throwFailureWhenFinished && taskFailed.get()) {
throw new RuntimeException("Task set failed with an uncaught throwable");
}
return !taskFailed.get();
}