public Void call()

in gobblin-runtime/src/main/java/org/apache/gobblin/runtime/SafeDatasetCommit.java [84:202]


  public Void call()
      throws Exception {
    if (this.datasetState.getState() == JobState.RunningState.COMMITTED) {
      log.info(this.datasetUrn + " have been committed.");
      return null;
    }
    metricContext = Instrumented.getMetricContext(datasetState, SafeDatasetCommit.class);

    finalizeDatasetStateBeforeCommit(this.datasetState);
    Class<? extends DataPublisher> dataPublisherClass;
    try (Closer closer = Closer.create()) {
      dataPublisherClass = JobContext.getJobDataPublisherClass(this.jobContext.getJobState())
          .or((Class<? extends DataPublisher>) Class.forName(ConfigurationKeys.DEFAULT_DATA_PUBLISHER_TYPE));
      if (!canCommitDataset(datasetState)) {
        log.warn(String
            .format("Not committing dataset %s of job %s with commit policy %s and state %s", this.datasetUrn,
                this.jobContext.getJobId(), this.jobContext.getJobCommitPolicy(), this.datasetState.getState()));
        checkForUnpublishedWUHandling(this.datasetUrn, this.datasetState, dataPublisherClass, closer);
        throw new RuntimeException(String
            .format("Not committing dataset %s of job %s with commit policy %s and state %s", this.datasetUrn,
                this.jobContext.getJobId(), this.jobContext.getJobCommitPolicy(), this.datasetState.getState()));
      }
    } catch (ReflectiveOperationException roe) {
      log.error("Failed to instantiate data publisher for dataset {} of job {}.", this.datasetUrn,
          this.jobContext.getJobId(), roe);
      throw new RuntimeException(roe);
    } finally {
      maySubmitFailureEvent(datasetState);
    }

    if (this.isJobCancelled) {
      log.info("Executing commit steps although job is cancelled due to job commit policy: " + this.jobContext
          .getJobCommitPolicy());
    }

    Optional<CommitSequence.Builder> commitSequenceBuilder = Optional.absent();
    boolean canPersistStates = true;
    try (Closer closer = Closer.create()) {
      if (this.shouldCommitDataInJob) {
        log.info(String.format("Committing dataset %s of job %s with commit policy %s and state %s", this.datasetUrn,
            this.jobContext.getJobId(), this.jobContext.getJobCommitPolicy(), this.datasetState.getState()));

        ListMultimap<TaskFactoryWrapper, TaskState> taskStatesByFactory = groupByTaskFactory(this.datasetState);

        for (Map.Entry<TaskFactoryWrapper, Collection<TaskState>> entry : taskStatesByFactory.asMap().entrySet()) {
          TaskFactory taskFactory = entry.getKey().getTaskFactory();

          if (this.deliverySemantics == DeliverySemantics.EXACTLY_ONCE) {
            if (taskFactory != null) {
              throw new RuntimeException("Custom task factories do not support exactly once delivery semantics.");
            }
            generateCommitSequenceBuilder(this.datasetState, entry.getValue());
          } else {
            DataPublisher publisher;

            if (taskFactory == null) {
              publisher = DataPublisherFactory.get(dataPublisherClass.getName(), this.jobContext.getJobState(),
                  this.jobContext.getJobBroker());

              // non-threadsafe publishers are not shareable and are not retained in the broker, so register them with
              // the closer
              if (!DataPublisherFactory.isPublisherCacheable(publisher)) {
                closer.register(publisher);
              }
            } else {
              // NOTE: sharing of publishers is not supported when they are instantiated through the TaskFactory.
              // This should be revisited if sharing is required.
              publisher = taskFactory.createDataPublisher(this.datasetState);
            }

            if (this.isJobCancelled) {
              if (publisher.canBeSkipped()) {
                log.warn(publisher.getClass() + " will be skipped.");
              } else {
                canPersistStates = false;
                throw new RuntimeException(
                    "Cannot persist state upon cancellation because publisher has unfinished work and cannot be skipped.");
              }
            } else if (this.isMultithreaded && !publisher.isThreadSafe()) {
              log.warn(String.format(
                  "Gobblin is set up to parallelize publishing, however the publisher %s is not thread-safe. "
                      + "Falling back to serial publishing.", publisher.getClass().getName()));
              safeCommitDataset(entry.getValue(), publisher);
            } else {
              commitDataset(entry.getValue(), publisher);
            }
          }
        }
        this.datasetState.setState(JobState.RunningState.COMMITTED);
      } else {
        if (this.datasetState.getState() == JobState.RunningState.SUCCESSFUL) {
          this.datasetState.setState(JobState.RunningState.COMMITTED);
        }
      }
    } catch (Throwable throwable) {
      log.error(String.format("Failed to commit dataset state for dataset %s of job %s", this.datasetUrn,
          this.jobContext.getJobId()), throwable);
      throw new RuntimeException(throwable);
    } finally {
      try {
        finalizeDatasetState(datasetState, datasetUrn);
        maySubmitFailureEvent(datasetState);
        maySubmitLineageEvent(datasetState);
        if (commitSequenceBuilder.isPresent()) {
          buildAndExecuteCommitSequence(commitSequenceBuilder.get(), datasetState, datasetUrn);
          datasetState.setState(JobState.RunningState.COMMITTED);
        } else if (canPersistStates) {
          persistDatasetState(datasetUrn, datasetState);
        }

      } catch (IOException | RuntimeException ioe) {
       log.error(String
            .format("Failed to persist dataset state for dataset %s of job %s", datasetUrn, this.jobContext.getJobId()),
            ioe);
        throw new RuntimeException(ioe);
      }
    }
    return null;
  }