public void commit()

in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java [913:1001]


  public void commit() {
    boolean isTaskFailed = false;

    try {
      // Check if all forks succeeded
      List<Integer> failedForkIds = new ArrayList<>();
      for (Optional<Fork> fork : this.forks.keySet()) {
        if (fork.isPresent()) {
          if (fork.get().isSucceeded()) {
            if (!fork.get().commit()) {
              failedForkIds.add(fork.get().getIndex());
            }
          } else {
            failedForkIds.add(fork.get().getIndex());
          }
        }
      }

      if (failedForkIds.size() == 0) {
        // Set the task state to SUCCESSFUL. The state is not set to COMMITTED
        // as the data publisher will do that upon successful data publishing.
        if (this.taskState.getWorkingState() != WorkUnitState.WorkingState.FAILED) {
          this.taskState.setWorkingState(WorkUnitState.WorkingState.SUCCESSFUL);
        }
      }
      else {
        ForkThrowableHolder holder = Task.getForkThrowableHolder(this.taskState.getTaskBroker());
        LOG.info("Holder for this task {} is {}", this.taskId, holder);
        if (!holder.isEmpty()) {
          if (failedForkIds.size() == 1 && holder.getThrowable(failedForkIds.get(0)).isPresent()) {
            failTask(holder.getThrowable(failedForkIds.get(0)).get());
          } else {
            failTask(holder.getAggregatedException(failedForkIds, this.taskId));
          }
        } else {
          // just in case there are some corner cases where Fork throw an exception but doesn't add into holder
          failTask(new ForkException("Fork branches " + failedForkIds + " failed for task " + this.taskId));
        }
      }
    } catch (Throwable t) {
      failTask(t);
      isTaskFailed = true;
    } finally {
      addConstructsFinalStateToTaskState(extractor, converter, rowChecker);

      this.taskState.setProp(ConfigurationKeys.WRITER_RECORDS_WRITTEN, getRecordsWritten());
      this.taskState.setProp(ConfigurationKeys.WRITER_BYTES_WRITTEN, getBytesWritten());

      this.submitTaskCommittedEvent();

      try {
        closer.close();
      } catch (Throwable t) {
        LOG.error("Failed to close all open resources", t);
        if ((!isIgnoreCloseFailures) && (!isTaskFailed)) {
          LOG.error("Setting the task state to failed.");
          failTask(t);
        }
      }

      for (Map.Entry<Optional<Fork>, Optional<Future<?>>> forkAndFuture : this.forks.entrySet()) {
        if (forkAndFuture.getKey().isPresent() && forkAndFuture.getValue().isPresent()) {
          try {
            forkAndFuture.getValue().get().cancel(true);
          } catch (Throwable t) {
            LOG.error(String.format("Failed to cancel Fork \"%s\"", forkAndFuture.getKey().get()), t);
          }
        }
      }

      try {
        if (shouldPublishDataInTask()) {
          // If data should be published by the task, publish the data and set the task state to COMMITTED.
          // Task data can only be published after all forks have been closed by closer.close().
          if (this.taskState.getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL) {
            publishTaskData();
            this.taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
          }
        }
      } catch (IOException ioe) {
        failTask(ioe);
      } finally {
        long endTime = System.currentTimeMillis();
        this.taskState.setEndTime(endTime);
        this.taskState.setTaskDuration(endTime - startTime);
        this.taskStateTracker.onTaskCommitCompletion(this);
      }
    }
  }