in gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/publisher/HiveConvertPublisher.java [148:270]
public void publishData(Collection<? extends WorkUnitState> states) throws IOException {
Set<String> cleanUpQueries = Sets.newLinkedHashSet();
Set<String> publishQueries = Sets.newLinkedHashSet();
List<String> directoriesToDelete = Lists.newArrayList();
try {
if (Iterables.tryFind(states, UNSUCCESSFUL_WORKUNIT).isPresent()) {
/////////////////////////////////////////
// Prepare cleanup and ignore publish
/////////////////////////////////////////
for (WorkUnitState wus : states) {
QueryBasedHivePublishEntity publishEntity = HiveAvroORCQueryGenerator.deserializePublishCommands(wus);
// Add cleanup commands - to be executed later
if (publishEntity.getCleanupQueries() != null) {
cleanUpQueries.addAll(publishEntity.getCleanupQueries());
}
if (publishEntity.getCleanupDirectories() != null) {
directoriesToDelete.addAll(publishEntity.getCleanupDirectories());
}
EventWorkunitUtils.setBeginPublishDDLExecuteTimeMetadata(wus, System.currentTimeMillis());
wus.setWorkingState(WorkingState.FAILED);
if (!wus.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY)) {
try {
new SlaEventSubmitter(eventSubmitter, EventConstants.CONVERSION_FAILED_EVENT, wus.getProperties()).submit();
} catch (Exception e) {
log.error("Failed while emitting SLA event, but ignoring and moving forward to curate " + "all clean up comamnds", e);
}
}
}
} else {
/////////////////////////////////////////
// Prepare publish and cleanup commands
/////////////////////////////////////////
for (WorkUnitState wus : PARTITION_PUBLISH_ORDERING.sortedCopy(states)) {
QueryBasedHivePublishEntity publishEntity = HiveAvroORCQueryGenerator.deserializePublishCommands(wus);
// Add cleanup commands - to be executed later
if (publishEntity.getCleanupQueries() != null) {
cleanUpQueries.addAll(publishEntity.getCleanupQueries());
}
if (publishEntity.getCleanupDirectories() != null) {
directoriesToDelete.addAll(publishEntity.getCleanupDirectories());
}
if (publishEntity.getPublishDirectories() != null) {
// Publish snapshot / partition directories
Map<String, String> publishDirectories = publishEntity.getPublishDirectories();
for (Map.Entry<String, String> publishDir : publishDirectories.entrySet()) {
moveDirectory(publishDir.getKey(), publishDir.getValue());
}
}
if (publishEntity.getPublishQueries() != null) {
publishQueries.addAll(publishEntity.getPublishQueries());
}
}
/////////////////////////////////////////
// Core publish
/////////////////////////////////////////
// Update publish start timestamp on all workunits
for (WorkUnitState wus : PARTITION_PUBLISH_ORDERING.sortedCopy(states)) {
if (HiveAvroORCQueryGenerator.deserializePublishCommands(wus).getPublishQueries() != null) {
EventWorkunitUtils.setBeginPublishDDLExecuteTimeMetadata(wus, System.currentTimeMillis());
}
}
// Actual publish: Register snapshot / partition
executeQueries(Lists.newArrayList(publishQueries));
// Update publish completion timestamp on all workunits
for (WorkUnitState wus : PARTITION_PUBLISH_ORDERING.sortedCopy(states)) {
if (HiveAvroORCQueryGenerator.deserializePublishCommands(wus).getPublishQueries() != null) {
EventWorkunitUtils.setEndPublishDDLExecuteTimeMetadata(wus, System.currentTimeMillis());
}
wus.setWorkingState(WorkingState.COMMITTED);
this.watermarker.setActualHighWatermark(wus);
// Emit an SLA event for conversion successful
if (!wus.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY)) {
EventWorkunitUtils.setIsFirstPublishMetadata(wus);
try {
new SlaEventSubmitter(eventSubmitter, EventConstants.CONVERSION_SUCCESSFUL_SLA_EVENT, wus.getProperties())
.submit();
} catch (Exception e) {
log.error("Failed while emitting SLA event, but ignoring and moving forward to curate " + "all clean up commands", e);
}
if (LineageUtils.shouldSetLineageInfo(wus)) {
setDestLineageInfo(wus, this.lineageInfo);
}
}
}
}
} finally {
/////////////////////////////////////////
// Preserving partition params
/////////////////////////////////////////
preservePartitionParams(states);
/////////////////////////////////////////
// Post publish cleanup
/////////////////////////////////////////
// Execute cleanup commands
try {
executeQueries(Lists.newArrayList(cleanUpQueries));
} catch (Exception e) {
log.error("Failed to cleanup staging entities in Hive metastore.", e);
}
try {
deleteDirectories(directoriesToDelete);
} catch (Exception e) {
log.error("Failed to cleanup staging directories.", e);
}
}
}