public void publishData()

in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java [148:270]


  public void publishData(Collection<? extends WorkUnitState> states) throws IOException {

    Set<String> cleanUpQueries = Sets.newLinkedHashSet();
    Set<String> publishQueries = Sets.newLinkedHashSet();
    List<String> directoriesToDelete = Lists.newArrayList();

    try {
      if (Iterables.tryFind(states, UNSUCCESSFUL_WORKUNIT).isPresent()) {
        /////////////////////////////////////////
        // Prepare cleanup and ignore publish
        /////////////////////////////////////////
        for (WorkUnitState wus : states) {
          QueryBasedHivePublishEntity publishEntity = HiveAvroORCQueryGenerator.deserializePublishCommands(wus);

          // Add cleanup commands - to be executed later
          if (publishEntity.getCleanupQueries() != null) {
            cleanUpQueries.addAll(publishEntity.getCleanupQueries());
          }

          if (publishEntity.getCleanupDirectories() != null) {
            directoriesToDelete.addAll(publishEntity.getCleanupDirectories());
          }

          EventWorkunitUtils.setBeginPublishDDLExecuteTimeMetadata(wus, System.currentTimeMillis());
          wus.setWorkingState(WorkingState.FAILED);
          if (!wus.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY)) {
            try {
              new SlaEventSubmitter(eventSubmitter, EventConstants.CONVERSION_FAILED_EVENT, wus.getProperties()).submit();
            } catch (Exception e) {
              log.error("Failed while emitting SLA event, but ignoring and moving forward to curate " + "all clean up comamnds", e);
            }
          }
        }
      } else {
        /////////////////////////////////////////
        // Prepare publish and cleanup commands
        /////////////////////////////////////////
        for (WorkUnitState wus : PARTITION_PUBLISH_ORDERING.sortedCopy(states)) {
          QueryBasedHivePublishEntity publishEntity = HiveAvroORCQueryGenerator.deserializePublishCommands(wus);

          // Add cleanup commands - to be executed later
          if (publishEntity.getCleanupQueries() != null) {
            cleanUpQueries.addAll(publishEntity.getCleanupQueries());
          }

          if (publishEntity.getCleanupDirectories() != null) {
            directoriesToDelete.addAll(publishEntity.getCleanupDirectories());
          }

          if (publishEntity.getPublishDirectories() != null) {
            // Publish snapshot / partition directories
            Map<String, String> publishDirectories = publishEntity.getPublishDirectories();
            for (Map.Entry<String, String> publishDir : publishDirectories.entrySet()) {
              moveDirectory(publishDir.getKey(), publishDir.getValue());
            }
          }

          if (publishEntity.getPublishQueries() != null) {
            publishQueries.addAll(publishEntity.getPublishQueries());
          }
        }

        /////////////////////////////////////////
        // Core publish
        /////////////////////////////////////////

        // Update publish start timestamp on all workunits
        for (WorkUnitState wus : PARTITION_PUBLISH_ORDERING.sortedCopy(states)) {
          if (HiveAvroORCQueryGenerator.deserializePublishCommands(wus).getPublishQueries() != null) {
            EventWorkunitUtils.setBeginPublishDDLExecuteTimeMetadata(wus, System.currentTimeMillis());
          }
        }

        // Actual publish: Register snapshot / partition
        executeQueries(Lists.newArrayList(publishQueries));

        // Update publish completion timestamp on all workunits
        for (WorkUnitState wus : PARTITION_PUBLISH_ORDERING.sortedCopy(states)) {
          if (HiveAvroORCQueryGenerator.deserializePublishCommands(wus).getPublishQueries() != null) {
            EventWorkunitUtils.setEndPublishDDLExecuteTimeMetadata(wus, System.currentTimeMillis());
          }

          wus.setWorkingState(WorkingState.COMMITTED);
          this.watermarker.setActualHighWatermark(wus);

          // Emit an SLA event for conversion successful
          if (!wus.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY)) {
            EventWorkunitUtils.setIsFirstPublishMetadata(wus);
            try {
              new SlaEventSubmitter(eventSubmitter, EventConstants.CONVERSION_SUCCESSFUL_SLA_EVENT, wus.getProperties())
                  .submit();
            } catch (Exception e) {
              log.error("Failed while emitting SLA event, but ignoring and moving forward to curate " + "all clean up commands", e);
            }
            if (LineageUtils.shouldSetLineageInfo(wus)) {
              setDestLineageInfo(wus, this.lineageInfo);
            }
          }
        }
      }
    } finally {
      /////////////////////////////////////////
      // Preserving partition params
      /////////////////////////////////////////
      preservePartitionParams(states);

      /////////////////////////////////////////
      // Post publish cleanup
      /////////////////////////////////////////

      // Execute cleanup commands
      try {
        executeQueries(Lists.newArrayList(cleanUpQueries));
      } catch (Exception e) {
        log.error("Failed to cleanup staging entities in Hive metastore.", e);
      }
      try {
        deleteDirectories(directoriesToDelete);
      } catch (Exception e) {
        log.error("Failed to cleanup staging directories.", e);
      }
    }
  }