in service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java [2304:2445]
private boolean sendNotificationForTableLike(
PolarisEntitySubType subType, TableIdentifier tableIdentifier, NotificationRequest request) {
LOGGER.debug(
"Handling notification request {} for tableIdentifier {}", request, tableIdentifier);
PolarisResolvedPathWrapper resolvedEntities =
resolvedEntityView.getPassthroughResolvedPath(
tableIdentifier, PolarisEntityType.TABLE_LIKE, subType);
NotificationType notificationType = request.getNotificationType();
Preconditions.checkNotNull(notificationType, "Expected a valid notification type.");
if (notificationType == NotificationType.DROP) {
return dropTableLike(
PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier, Map.of(), false /* purge */)
.isSuccess();
} else if (notificationType == NotificationType.VALIDATE) {
// In this mode we don't want to make any mutations, so we won't auto-create non-existing
// parent namespaces. This means when we want to validate allowedLocations for the proposed
// table metadata location, we must independently find the deepest non-null parent namespace
// of the TableIdentifier, which may even be the base CatalogEntity if no parent namespaces
// actually exist yet. We can then extract the right StorageInfo entity via a normal call
// to findStorageInfoFromHierarchy.
PolarisResolvedPathWrapper resolvedStorageEntity = null;
Optional<PolarisEntity> storageInfoEntity = Optional.empty();
for (int i = tableIdentifier.namespace().length(); i >= 0; i--) {
Namespace nsLevel =
Namespace.of(
Arrays.stream(tableIdentifier.namespace().levels())
.limit(i)
.toArray(String[]::new));
resolvedStorageEntity = resolvedEntityView.getResolvedPath(nsLevel);
if (resolvedStorageEntity != null) {
storageInfoEntity = FileIOUtil.findStorageInfoFromHierarchy(resolvedStorageEntity);
break;
}
}
if (resolvedStorageEntity == null || storageInfoEntity.isEmpty()) {
throw new BadRequestException(
"Failed to find StorageInfo entity for TableIdentifier %s", tableIdentifier);
}
// Validate location against the resolvedStorageEntity
String metadataLocation =
transformTableLikeLocation(request.getPayload().getMetadataLocation());
validateLocationForTableLike(tableIdentifier, metadataLocation, resolvedStorageEntity);
// Validate that we can construct a FileIO
String locationDir = metadataLocation.substring(0, metadataLocation.lastIndexOf("/"));
loadFileIOForTableLike(
tableIdentifier,
Set.of(locationDir),
resolvedStorageEntity,
new HashMap<>(tableDefaultProperties),
Set.of(PolarisStorageActions.READ));
LOGGER.debug(
"Successful VALIDATE notification for tableIdentifier {}, metadataLocation {}",
tableIdentifier,
metadataLocation);
} else if (notificationType == NotificationType.CREATE
|| notificationType == NotificationType.UPDATE) {
Namespace ns = tableIdentifier.namespace();
createNonExistingNamespaces(ns);
PolarisResolvedPathWrapper resolvedParent = resolvedEntityView.getPassthroughResolvedPath(ns);
IcebergTableLikeEntity entity =
IcebergTableLikeEntity.of(
resolvedEntities == null ? null : resolvedEntities.getRawLeafEntity());
String existingLocation;
String newLocation = transformTableLikeLocation(request.getPayload().getMetadataLocation());
if (null == entity) {
existingLocation = null;
entity =
new IcebergTableLikeEntity.Builder(tableIdentifier, newLocation)
.setCatalogId(getCatalogId())
.setSubType(PolarisEntitySubType.ICEBERG_TABLE)
.setId(
getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId())
.setLastNotificationTimestamp(request.getPayload().getTimestamp())
.build();
} else {
// If the notification timestamp is out-of-order, we should not update the table
if (entity.getLastAdmittedNotificationTimestamp().isPresent()
&& request.getPayload().getTimestamp()
<= entity.getLastAdmittedNotificationTimestamp().get()) {
throw new AlreadyExistsException(
"A notification with a newer timestamp has been processed for table %s",
tableIdentifier);
}
existingLocation = entity.getMetadataLocation();
entity =
new IcebergTableLikeEntity.Builder(entity)
.setMetadataLocation(newLocation)
.setLastNotificationTimestamp(request.getPayload().getTimestamp())
.build();
}
// first validate we can read the metadata file
validateLocationForTableLike(tableIdentifier, newLocation);
String locationDir = newLocation.substring(0, newLocation.lastIndexOf("/"));
FileIO fileIO =
loadFileIOForTableLike(
tableIdentifier,
Set.of(locationDir),
resolvedParent,
new HashMap<>(tableDefaultProperties),
Set.of(PolarisStorageActions.READ, PolarisStorageActions.WRITE));
TableMetadata tableMetadata = TableMetadataParser.read(fileIO, newLocation);
// then validate that it points to a valid location for this table
validateLocationForTableLike(tableIdentifier, tableMetadata.location());
// finally, validate that the metadata file is within the table directory
validateMetadataFileInTableDir(
tableIdentifier,
tableMetadata,
CatalogEntity.of(resolvedParent.getRawFullPath().getFirst()));
// TODO: These might fail due to concurrent update; we need to do a retry in those cases.
if (null == existingLocation) {
LOGGER.debug(
"Creating table {} for notification with metadataLocation {}",
tableIdentifier,
newLocation);
createTableLike(tableIdentifier, entity, resolvedParent);
} else {
LOGGER.debug(
"Updating table {} for notification with metadataLocation {}",
tableIdentifier,
newLocation);
updateTableLike(tableIdentifier, entity);
}
}
return true;
}