in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java [78:183]
protected Collection<CopyEntity> generateCopyEntities() throws IOException {
try (Closer closer = Closer.create()) {
MultiTimingEvent multiTimer = closer.register(new MultiTimingEvent(this.eventSubmitter, "PartitionCopy", true));
int stepPriority = 0;
String fileSet = HiveCopyEntityHelper.gson.toJson(this.partition.getValues());
List<CopyEntity> copyEntities = Lists.newArrayList();
stepPriority = hiveCopyEntityHelper.addSharedSteps(copyEntities, fileSet, stepPriority);
multiTimer.nextStage(HiveCopyEntityHelper.Stages.COMPUTE_TARGETS);
Path targetPath = hiveCopyEntityHelper.getTargetLocation(hiveCopyEntityHelper.getTargetFs(),
this.partition.getDataLocation(), Optional.of(this.partition));
Partition targetPartition = getTargetPartition(this.partition, targetPath);
multiTimer.nextStage(HiveCopyEntityHelper.Stages.EXISTING_PARTITION);
if (this.existingTargetPartition.isPresent()) {
hiveCopyEntityHelper.getTargetPartitions().remove(this.partition.getValues());
try {
checkPartitionCompatibility(targetPartition, this.existingTargetPartition.get());
} catch (IOException ioe) {
if (hiveCopyEntityHelper.getExistingEntityPolicy() != HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_PARTITIONS &&
hiveCopyEntityHelper.getExistingEntityPolicy() != HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_TABLE_AND_PARTITIONS) {
log.error("Source and target partitions are not compatible. Aborting copy of partition " + this.partition,
ioe);
// Silence error and continue processing workunits if we allow partial success
if (ConfigUtils.getString(hiveCopyEntityHelper.getConfiguration().getConfig(), ConfigurationKeys.JOB_COMMIT_POLICY_KEY,
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS.toString()).equals(JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS.toString())) {
return Lists.newArrayList();
} else {
throw ioe;
}
}
log.warn("Source and target partitions are not compatible. Will override target partition: " + ioe.getMessage());
log.debug("Incompatibility details: ", ioe);
stepPriority = hiveCopyEntityHelper.addPartitionDeregisterSteps(copyEntities, fileSet, stepPriority,
hiveCopyEntityHelper.getTargetTable(), this.existingTargetPartition.get());
this.existingTargetPartition = Optional.absent();
}
}
multiTimer.nextStage(HiveCopyEntityHelper.Stages.PARTITION_SKIP_PREDICATE);
if (hiveCopyEntityHelper.getFastPartitionSkip().isPresent()
&& hiveCopyEntityHelper.getFastPartitionSkip().get().apply(this)) {
log.info(String.format("Skipping copy of partition %s due to fast partition skip predicate.",
this.partition.getCompleteName()));
return Lists.newArrayList();
}
HiveSpec partitionHiveSpec = new SimpleHiveSpec.Builder<>(targetPath)
.withTable(HiveMetaStoreUtils.getHiveTable(hiveCopyEntityHelper.getTargetTable().getTTable()))
.withPartition(Optional.of(HiveMetaStoreUtils.getHivePartition(targetPartition.getTPartition()))).build();
HiveRegisterStep register = new HiveRegisterStep(hiveCopyEntityHelper.getTargetMetastoreURI(), partitionHiveSpec,
hiveCopyEntityHelper.getHiveRegProps());
copyEntities.add(new PostPublishStep(fileSet, Maps.<String, String> newHashMap(), register, stepPriority++));
multiTimer.nextStage(HiveCopyEntityHelper.Stages.CREATE_LOCATIONS);
HiveLocationDescriptor sourceLocation =
HiveLocationDescriptor.forPartition(this.partition, hiveCopyEntityHelper.getDataset().fs, this.properties);
HiveLocationDescriptor desiredTargetLocation =
HiveLocationDescriptor.forPartition(targetPartition, hiveCopyEntityHelper.getTargetFs(), this.properties);
Optional<HiveLocationDescriptor> existingTargetLocation = this.existingTargetPartition.isPresent()
? Optional.of(HiveLocationDescriptor.forPartition(this.existingTargetPartition.get(),
hiveCopyEntityHelper.getTargetFs(), this.properties))
: Optional.<HiveLocationDescriptor> absent();
multiTimer.nextStage(HiveCopyEntityHelper.Stages.FULL_PATH_DIFF);
HiveCopyEntityHelper.DiffPathSet
diffPathSet = HiveCopyEntityHelper.fullPathDiff(sourceLocation, desiredTargetLocation, existingTargetLocation,
Optional.<Partition> absent(), multiTimer, hiveCopyEntityHelper);
multiTimer.nextStage(HiveCopyEntityHelper.Stages.CREATE_DELETE_UNITS);
if (diffPathSet.pathsToDelete.size() > 0) {
DeleteFileCommitStep deleteStep = DeleteFileCommitStep.fromPaths(hiveCopyEntityHelper.getTargetFs(),
diffPathSet.pathsToDelete, hiveCopyEntityHelper.getDataset().properties);
copyEntities.add(new PrePublishStep(fileSet, Maps.<String, String> newHashMap(), deleteStep, stepPriority++));
}
multiTimer.nextStage(HiveCopyEntityHelper.Stages.CREATE_COPY_UNITS);
for (CopyableFile.Builder builder : hiveCopyEntityHelper.getCopyableFilesFromPaths(diffPathSet.filesToCopy,
hiveCopyEntityHelper.getConfiguration(), Optional.of(this.partition))) {
CopyableFile fileEntity =
builder.fileSet(fileSet).checksum(new byte[0]).datasetOutputPath(desiredTargetLocation.location.toString())
.build();
DatasetDescriptor sourceDataset = this.hiveCopyEntityHelper.getSourceDataset();
PartitionDescriptor source = new PartitionDescriptor(partition.getName(), sourceDataset);
fileEntity.setSourceData(source);
DatasetDescriptor destinationDataset = this.hiveCopyEntityHelper.getDestinationDataset();
Partition destinationPartition =
this.existingTargetPartition.isPresent() ? this.existingTargetPartition.get() : partition;
PartitionDescriptor destination =
new PartitionDescriptor(destinationPartition.getName(), destinationDataset);
fileEntity.setDestinationData(destination);
copyEntities.add(fileEntity);
}
log.info("Created {} copy entities for partition {}", copyEntities.size(), this.partition.getCompleteName());
return copyEntities;
}
}