in metacat-main/src/main/java/com/netflix/metacat/main/services/notifications/sns/SNSNotificationServiceUtil.java [94:163]
public TablePartitionsUpdatePayload createTablePartitionsUpdatePayload(
final List<PartitionDto> partitionDtos,
final MetacatEvent event) {
final List<String> deleteColumnValues;
String latestDeleteColumnValue = null;
String message;
try {
final Optional<ObjectNode> objectNode = this.userMetadataService.getDefinitionMetadata(
QualifiedName.ofTable(event.getName().getCatalogName(), event.getName().getDatabaseName(),
event.getName().getTableName()));
//Mark as missing metadata if any of delete column or partition column data type is missing
if (objectNode.isPresent()
&& !objectNode.get().at(DELETION_COLUMN_PATH).isMissingNode()
&& !objectNode.get().at(PARTITION_COLUMN_DATA_TYPE_PATH).isMissingNode()) {
final String deleteColumn = objectNode.get().at(DELETION_COLUMN_PATH).textValue();
//Mark with message empty delete column and return
if (StringUtils.isEmpty(deleteColumn)) {
return new TablePartitionsUpdatePayload(
null,
partitionDtos.size(),
0,
SNSNotificationPartitionAddMsg.EMPTY_DELETE_COLUMN.name(),
getPartitionNameListFromDtos(partitionDtos)
);
}
deleteColumnValues = getSortedDeletionPartitionKeys(partitionDtos, deleteColumn);
//Calculate the latest partition key from candidates
if (deleteColumnValues != null && !deleteColumnValues.isEmpty()) {
message = SNSNotificationPartitionAddMsg.ALL_FUTURE_PARTITION_KEYS.name();
//using utc now as today
final long nowSecond = Instant.now().getEpochSecond();
final boolean regional = PST_TIME.contains(
objectNode.get().at(PARTITION_COLUMN_DATA_TYPE_PATH).textValue());
//convert the value to utc then compare
for (String val : deleteColumnValues) {
try {
final Long timestamp = getTimeStamp(val, regional);
if (timestamp <= nowSecond) {
latestDeleteColumnValue = deleteColumn + "=" + val; //the delete column with value
message = SNSNotificationPartitionAddMsg.ATTACHED_VALID_PARITITION_KEY.name();
break;
}
} catch (ParseException ex) {
message = SNSNotificationPartitionAddMsg.INVALID_PARTITION_KEY_FORMAT.name();
log.debug("Failure of getting latest key due to invalid timestamp format {} {}:{}",
event.getName().getTableName(), deleteColumn, val);
break;
}
}
} else {
message = SNSNotificationPartitionAddMsg.NO_CANDIDATE_PARTITION_KEYS.name();
}
} else {
message = SNSNotificationPartitionAddMsg.MISSING_METADATA_INFO_FOR_PARTITION_KEY.name();
}
} catch (Exception ex) {
message = SNSNotificationPartitionAddMsg.FAILURE_OF_GET_LATEST_PARTITION_KEY.name();
log.error("Failure of createTablePartitionsUpdatePayload", ex.getMessage());
}
return new TablePartitionsUpdatePayload(
latestDeleteColumnValue,
partitionDtos.size(),
0,
message,
getPartitionNameListFromDtos(partitionDtos)
);
}