in service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java [1359:1518]
public void doCommit(TableMetadata base, TableMetadata metadata) {
polarisEventListener.onBeforeTableCommited(
new BeforeTableCommitedEvent(tableIdentifier, base, metadata));
LOGGER.debug(
"doCommit for table {} with base {}, metadata {}", tableIdentifier, base, metadata);
// TODO: Maybe avoid writing metadata if there's definitely a transaction conflict
if (null == base && !namespaceExists(tableIdentifier.namespace())) {
throw new NoSuchNamespaceException(
"Cannot create table '%s'. Namespace does not exist: '%s'",
tableIdentifier, tableIdentifier.namespace());
}
PolarisResolvedPathWrapper resolvedTableEntities =
resolvedEntityView.getPassthroughResolvedPath(
tableIdentifier, PolarisEntityType.TABLE_LIKE, PolarisEntitySubType.ICEBERG_TABLE);
// Fetch credentials for the resolved entity. The entity could be the table itself (if it has
// already been stored and credentials have been configured directly) or it could be the
// table's namespace or catalog.
PolarisResolvedPathWrapper resolvedStorageEntity =
resolvedTableEntities == null
? resolvedEntityView.getResolvedPath(tableIdentifier.namespace())
: resolvedTableEntities;
// refresh credentials because we need to read the metadata file to validate its location
tableFileIO =
loadFileIOForTableLike(
tableIdentifier,
getLocationsAllowedToBeAccessed(metadata),
resolvedStorageEntity,
new HashMap<>(metadata.properties()),
Set.of(PolarisStorageActions.READ, PolarisStorageActions.WRITE));
List<PolarisEntity> resolvedNamespace =
resolvedTableEntities == null
? resolvedEntityView.getResolvedPath(tableIdentifier.namespace()).getRawFullPath()
: resolvedTableEntities.getRawParentPath();
CatalogEntity catalog = CatalogEntity.of(resolvedNamespace.getFirst());
if (base == null
|| !metadata.location().equals(base.location())
|| !Objects.equal(
base.properties().get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY),
metadata
.properties()
.get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY))) {
// If location is changing then we must validate that the requested location is valid
// for the storage configuration inherited under this entity's path.
Set<String> dataLocations = new HashSet<>();
dataLocations.add(metadata.location());
if (metadata.properties().get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)
!= null) {
dataLocations.add(
metadata
.properties()
.get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY));
}
if (metadata
.properties()
.get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)
!= null) {
dataLocations.add(
metadata
.properties()
.get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY));
}
validateLocationsForTableLike(tableIdentifier, dataLocations, resolvedStorageEntity);
// also validate that the table location doesn't overlap an existing table
dataLocations.forEach(
location ->
validateNoLocationOverlap(
catalogEntity,
tableIdentifier,
resolvedNamespace,
location,
resolvedStorageEntity.getRawLeafEntity()));
// and that the metadata file points to a location within the table's directory structure
if (metadata.metadataFileLocation() != null) {
validateMetadataFileInTableDir(tableIdentifier, metadata, catalog);
}
}
String newLocation = writeNewMetadataIfRequired(base == null, metadata);
String oldLocation = base == null ? null : base.metadataFileLocation();
// TODO: Consider using the entity from doRefresh() directly to do the conflict detection
// instead of a two-layer CAS (checking metadataLocation to detect concurrent modification
// between doRefresh() and doCommit(), and then updateEntityPropertiesIfNotChanged to detect
// concurrent
// modification between our checking of unchanged metadataLocation here and actual
// persistence-layer commit).
PolarisResolvedPathWrapper resolvedPath =
resolvedEntityView.getPassthroughResolvedPath(
tableIdentifier, PolarisEntityType.TABLE_LIKE, PolarisEntitySubType.ANY_SUBTYPE);
if (resolvedPath != null && resolvedPath.getRawLeafEntity() != null) {
if (resolvedPath.getRawLeafEntity().getSubType() == PolarisEntitySubType.ICEBERG_VIEW) {
throw new AlreadyExistsException(
"View with same name already exists: %s", tableIdentifier);
} else if (resolvedPath.getRawLeafEntity().getSubType()
== PolarisEntitySubType.GENERIC_TABLE) {
throw new AlreadyExistsException(
"Generic table with same name already exists: %s", tableIdentifier);
}
}
IcebergTableLikeEntity entity =
IcebergTableLikeEntity.of(resolvedPath == null ? null : resolvedPath.getRawLeafEntity());
String existingLocation;
if (null == entity) {
existingLocation = null;
entity =
new IcebergTableLikeEntity.Builder(tableIdentifier, newLocation)
.setCatalogId(getCatalogId())
.setSubType(PolarisEntitySubType.ICEBERG_TABLE)
.setBaseLocation(metadata.location())
.setId(
getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId())
.build();
} else {
existingLocation = entity.getMetadataLocation();
entity =
new IcebergTableLikeEntity.Builder(entity)
.setBaseLocation(metadata.location())
.setMetadataLocation(newLocation)
.build();
}
if (!Objects.equal(existingLocation, oldLocation)) {
if (null == base) {
throw new AlreadyExistsException("Table already exists: %s", fullTableName);
}
if (null == existingLocation) {
throw new NoSuchTableException("Table does not exist: %s", fullTableName);
}
throw new CommitFailedException(
"Cannot commit to table %s metadata location from %s to %s "
+ "because it has been concurrently modified to %s",
tableIdentifier, oldLocation, newLocation, existingLocation);
}
// We diverge from `BaseMetastoreTableOperations` in the below code block
if (makeMetadataCurrentOnCommit) {
currentMetadata =
TableMetadata.buildFrom(metadata)
.withMetadataLocation(newLocation)
.discardChanges()
.build();
currentMetadataLocation = newLocation;
}
if (null == existingLocation) {
createTableLike(tableIdentifier, entity);
} else {
updateTableLike(tableIdentifier, entity);
}
polarisEventListener.onAfterTableCommited(
new AfterTableCommitedEvent(tableIdentifier, base, metadata));
}