in service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java [839:943]
public void commitTransaction(CommitTransactionRequest commitTransactionRequest) {
PolarisAuthorizableOperation op = PolarisAuthorizableOperation.COMMIT_TRANSACTION;
// TODO: The authz actually needs to detect hidden updateForStagedCreate UpdateTableRequests
// and have some kind of per-item conditional privilege requirement if we want to make it
// so that only the stageCreate updates need TABLE_CREATE whereas everything else only
// needs TABLE_WRITE_PROPERTIES.
authorizeCollectionOfTableLikeOperationOrThrow(
op,
PolarisEntitySubType.ICEBERG_TABLE,
commitTransactionRequest.tableChanges().stream()
.map(UpdateTableRequest::identifier)
.toList());
CatalogEntity catalog =
CatalogEntity.of(
resolutionManifest
.getResolvedReferenceCatalogEntity()
.getResolvedLeafEntity()
.getEntity());
if (isStaticFacade(catalog)) {
throw new BadRequestException("Cannot update table on static-facade external catalogs.");
}
if (!(baseCatalog instanceof IcebergCatalog)) {
throw new BadRequestException(
"Unsupported operation: commitTransaction with baseCatalog type: %s",
baseCatalog.getClass().getName());
}
// Swap in TransactionWorkspaceMetaStoreManager for all mutations made by this baseCatalog to
// only go into an in-memory collection that we can commit as a single atomic unit after all
// validations.
TransactionWorkspaceMetaStoreManager transactionMetaStoreManager =
new TransactionWorkspaceMetaStoreManager(metaStoreManager);
((IcebergCatalog) baseCatalog).setMetaStoreManager(transactionMetaStoreManager);
commitTransactionRequest.tableChanges().stream()
.forEach(
change -> {
Table table = baseCatalog.loadTable(change.identifier());
if (!(table instanceof BaseTable)) {
throw new IllegalStateException(
"Cannot wrap catalog that does not produce BaseTable");
}
if (isCreate(change)) {
throw new BadRequestException(
"Unsupported operation: commitTranaction with updateForStagedCreate: %s",
change);
}
TableOperations tableOps = ((BaseTable) table).operations();
TableMetadata currentMetadata = tableOps.current();
// Validate requirements; any CommitFailedExceptions will fail the overall request
change.requirements().forEach(requirement -> requirement.validate(currentMetadata));
// Apply changes
TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(currentMetadata);
change.updates().stream()
.forEach(
singleUpdate -> {
// Note: If location-overlap checking is refactored to be atomic, we could
// support validation within a single multi-table transaction as well, but
// will need to update the TransactionWorkspaceMetaStoreManager to better
// expose the concept of being able to read uncommitted updates.
if (singleUpdate instanceof MetadataUpdate.SetLocation) {
if (!currentMetadata
.location()
.equals(((MetadataUpdate.SetLocation) singleUpdate).location())
&& !callContext
.getPolarisCallContext()
.getConfigurationStore()
.getConfiguration(
callContext.getPolarisCallContext(),
FeatureConfiguration.ALLOW_NAMESPACE_LOCATION_OVERLAP)) {
throw new BadRequestException(
"Unsupported operation: commitTransaction containing SetLocation"
+ " for table '%s' and new location '%s'",
change.identifier(),
((MetadataUpdate.SetLocation) singleUpdate).location());
}
}
// Apply updates to builder
singleUpdate.applyTo(metadataBuilder);
});
// Commit into transaction workspace we swapped the baseCatalog to use
TableMetadata updatedMetadata = metadataBuilder.build();
if (!updatedMetadata.changes().isEmpty()) {
tableOps.commit(currentMetadata, updatedMetadata);
}
});
// Commit the collected updates in a single atomic operation
List<EntityWithPath> pendingUpdates = transactionMetaStoreManager.getPendingUpdates();
EntitiesResult result =
metaStoreManager.updateEntitiesPropertiesIfNotChanged(
callContext.getPolarisCallContext(), pendingUpdates);
if (!result.isSuccess()) {
// TODO: Retries and server-side cleanup on failure
throw new CommitFailedException(
"Transaction commit failed with status: %s, extraInfo: %s",
result.getReturnStatus(), result.getExtraInformation());
}
}