protected Collection generateCopyEntities()

in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java [78:183]


  protected Collection<CopyEntity> generateCopyEntities() throws IOException {

    try (Closer closer = Closer.create()) {
      MultiTimingEvent multiTimer = closer.register(new MultiTimingEvent(this.eventSubmitter, "PartitionCopy", true));

      int stepPriority = 0;
      String fileSet = HiveCopyEntityHelper.gson.toJson(this.partition.getValues());

      List<CopyEntity> copyEntities = Lists.newArrayList();

      stepPriority = hiveCopyEntityHelper.addSharedSteps(copyEntities, fileSet, stepPriority);

      multiTimer.nextStage(HiveCopyEntityHelper.Stages.COMPUTE_TARGETS);
      Path targetPath = hiveCopyEntityHelper.getTargetLocation(hiveCopyEntityHelper.getTargetFs(),
          this.partition.getDataLocation(), Optional.of(this.partition));
      Partition targetPartition = getTargetPartition(this.partition, targetPath);

      multiTimer.nextStage(HiveCopyEntityHelper.Stages.EXISTING_PARTITION);
      if (this.existingTargetPartition.isPresent()) {
        hiveCopyEntityHelper.getTargetPartitions().remove(this.partition.getValues());
        try {
          checkPartitionCompatibility(targetPartition, this.existingTargetPartition.get());
        } catch (IOException ioe) {
          if (hiveCopyEntityHelper.getExistingEntityPolicy() != HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_PARTITIONS &&
              hiveCopyEntityHelper.getExistingEntityPolicy() != HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_TABLE_AND_PARTITIONS) {
            log.error("Source and target partitions are not compatible. Aborting copy of partition " + this.partition,
                ioe);
            // Silence error and continue processing workunits if we allow partial success
            if (ConfigUtils.getString(hiveCopyEntityHelper.getConfiguration().getConfig(), ConfigurationKeys.JOB_COMMIT_POLICY_KEY,
                JobCommitPolicy.COMMIT_ON_FULL_SUCCESS.toString()).equals(JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS.toString())) {
              return Lists.newArrayList();
            } else {
              throw ioe;
            }
          }
          log.warn("Source and target partitions are not compatible. Will override target partition: " + ioe.getMessage());
          log.debug("Incompatibility details: ", ioe);
          stepPriority = hiveCopyEntityHelper.addPartitionDeregisterSteps(copyEntities, fileSet, stepPriority,
              hiveCopyEntityHelper.getTargetTable(), this.existingTargetPartition.get());
          this.existingTargetPartition = Optional.absent();
        }
      }

      multiTimer.nextStage(HiveCopyEntityHelper.Stages.PARTITION_SKIP_PREDICATE);
      if (hiveCopyEntityHelper.getFastPartitionSkip().isPresent()
          && hiveCopyEntityHelper.getFastPartitionSkip().get().apply(this)) {
        log.info(String.format("Skipping copy of partition %s due to fast partition skip predicate.",
            this.partition.getCompleteName()));
        return Lists.newArrayList();
      }

      HiveSpec partitionHiveSpec = new SimpleHiveSpec.Builder<>(targetPath)
          .withTable(HiveMetaStoreUtils.getHiveTable(hiveCopyEntityHelper.getTargetTable().getTTable()))
          .withPartition(Optional.of(HiveMetaStoreUtils.getHivePartition(targetPartition.getTPartition()))).build();
      HiveRegisterStep register = new HiveRegisterStep(hiveCopyEntityHelper.getTargetMetastoreURI(), partitionHiveSpec,
          hiveCopyEntityHelper.getHiveRegProps());
      copyEntities.add(new PostPublishStep(fileSet, Maps.<String, String> newHashMap(), register, stepPriority++));

      multiTimer.nextStage(HiveCopyEntityHelper.Stages.CREATE_LOCATIONS);
      HiveLocationDescriptor sourceLocation =
          HiveLocationDescriptor.forPartition(this.partition, hiveCopyEntityHelper.getDataset().fs, this.properties);
      HiveLocationDescriptor desiredTargetLocation =
          HiveLocationDescriptor.forPartition(targetPartition, hiveCopyEntityHelper.getTargetFs(), this.properties);
      Optional<HiveLocationDescriptor> existingTargetLocation = this.existingTargetPartition.isPresent()
          ? Optional.of(HiveLocationDescriptor.forPartition(this.existingTargetPartition.get(),
              hiveCopyEntityHelper.getTargetFs(), this.properties))
          : Optional.<HiveLocationDescriptor> absent();

      multiTimer.nextStage(HiveCopyEntityHelper.Stages.FULL_PATH_DIFF);
      HiveCopyEntityHelper.DiffPathSet
          diffPathSet = HiveCopyEntityHelper.fullPathDiff(sourceLocation, desiredTargetLocation, existingTargetLocation,
          Optional.<Partition> absent(), multiTimer, hiveCopyEntityHelper);

      multiTimer.nextStage(HiveCopyEntityHelper.Stages.CREATE_DELETE_UNITS);
      if (diffPathSet.pathsToDelete.size() > 0) {
        DeleteFileCommitStep deleteStep = DeleteFileCommitStep.fromPaths(hiveCopyEntityHelper.getTargetFs(),
            diffPathSet.pathsToDelete, hiveCopyEntityHelper.getDataset().properties);
        copyEntities.add(new PrePublishStep(fileSet, Maps.<String, String> newHashMap(), deleteStep, stepPriority++));
      }

      multiTimer.nextStage(HiveCopyEntityHelper.Stages.CREATE_COPY_UNITS);
      for (CopyableFile.Builder builder : hiveCopyEntityHelper.getCopyableFilesFromPaths(diffPathSet.filesToCopy,
          hiveCopyEntityHelper.getConfiguration(), Optional.of(this.partition))) {
        CopyableFile fileEntity =
            builder.fileSet(fileSet).checksum(new byte[0]).datasetOutputPath(desiredTargetLocation.location.toString())
                .build();

        DatasetDescriptor sourceDataset = this.hiveCopyEntityHelper.getSourceDataset();
        PartitionDescriptor source = new PartitionDescriptor(partition.getName(), sourceDataset);
        fileEntity.setSourceData(source);

        DatasetDescriptor destinationDataset = this.hiveCopyEntityHelper.getDestinationDataset();
        Partition destinationPartition =
            this.existingTargetPartition.isPresent() ? this.existingTargetPartition.get() : partition;
        PartitionDescriptor destination =
            new PartitionDescriptor(destinationPartition.getName(), destinationDataset);
        fileEntity.setDestinationData(destination);

        copyEntities.add(fileEntity);
      }

      log.info("Created {} copy entities for partition {}", copyEntities.size(), this.partition.getCompleteName());

      return copyEntities;
    }
  }