private boolean runParallel()

in core/src/main/java/org/apache/iceberg/util/Tasks.java [294:400]


    private <E extends Exception> boolean runParallel(
        final Task<I, E> task, Class<E> exceptionClass) throws E {
      final Queue<I> succeeded = new ConcurrentLinkedQueue<>();
      final Queue<Throwable> exceptions = new ConcurrentLinkedQueue<>();
      final AtomicBoolean taskFailed = new AtomicBoolean(false);
      final AtomicBoolean abortFailed = new AtomicBoolean(false);
      final AtomicBoolean revertFailed = new AtomicBoolean(false);

      List<Future<?>> futures = Lists.newArrayList();

      for (final I item : items) {
        // submit a task for each item that will either run or abort the task
        futures.add(
            service.submit(
                new Runnable() {
                  @Override
                  public void run() {
                    if (!(stopOnFailure && taskFailed.get())) {
                      // run the task with retries
                      boolean threw = true;
                      try {
                        runTaskWithRetry(task, item);

                        succeeded.add(item);

                        threw = false;

                      } catch (Exception e) {
                        taskFailed.set(true);
                        exceptions.add(e);

                        if (onFailure != null) {
                          tryRunOnFailure(item, e);
                        }
                      } finally {
                        if (threw) {
                          taskFailed.set(true);
                        }
                      }

                    } else if (abortTask != null) {
                      // abort the task instead of running it
                      if (stopAbortsOnFailure && abortFailed.get()) {
                        return;
                      }

                      boolean failed = true;
                      try {
                        abortTask.run(item);
                        failed = false;
                      } catch (Exception e) {
                        LOG.error("Failed to abort task", e);
                        // swallow the exception
                      } finally {
                        if (failed) {
                          abortFailed.set(true);
                        }
                      }
                    }
                  }
                }));
      }

      // let the above tasks complete (or abort)
      exceptions.addAll(waitFor(futures));
      futures.clear();

      if (taskFailed.get() && revertTask != null) {
        // at least one task failed, revert any that succeeded
        for (final I item : succeeded) {
          futures.add(
              service.submit(
                  new Runnable() {
                    @Override
                    public void run() {
                      if (stopRevertsOnFailure && revertFailed.get()) {
                        return;
                      }

                      boolean failed = true;
                      try {
                        revertTask.run(item);
                        failed = false;
                      } catch (Exception e) {
                        LOG.error("Failed to revert task", e);
                        // swallow the exception
                      } finally {
                        if (failed) {
                          revertFailed.set(true);
                        }
                      }
                    }
                  }));
        }

        // let the revert tasks complete
        exceptions.addAll(waitFor(futures));
      }

      if (throwFailureWhenFinished && !exceptions.isEmpty()) {
        Tasks.throwOne(exceptions, exceptionClass);
      } else if (throwFailureWhenFinished && taskFailed.get()) {
        throw new RuntimeException("Task set failed with an uncaught throwable");
      }

      return !taskFailed.get();
    }