public void commitTransaction()

in service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java [839:943]


  public void commitTransaction(CommitTransactionRequest commitTransactionRequest) {
    PolarisAuthorizableOperation op = PolarisAuthorizableOperation.COMMIT_TRANSACTION;
    // TODO: The authz actually needs to detect hidden updateForStagedCreate UpdateTableRequests
    // and have some kind of per-item conditional privilege requirement if we want to make it
    // so that only the stageCreate updates need TABLE_CREATE whereas everything else only
    // needs TABLE_WRITE_PROPERTIES.
    authorizeCollectionOfTableLikeOperationOrThrow(
        op,
        PolarisEntitySubType.ICEBERG_TABLE,
        commitTransactionRequest.tableChanges().stream()
            .map(UpdateTableRequest::identifier)
            .toList());
    CatalogEntity catalog =
        CatalogEntity.of(
            resolutionManifest
                .getResolvedReferenceCatalogEntity()
                .getResolvedLeafEntity()
                .getEntity());
    if (isStaticFacade(catalog)) {
      throw new BadRequestException("Cannot update table on static-facade external catalogs.");
    }

    if (!(baseCatalog instanceof IcebergCatalog)) {
      throw new BadRequestException(
          "Unsupported operation: commitTransaction with baseCatalog type: %s",
          baseCatalog.getClass().getName());
    }

    // Swap in TransactionWorkspaceMetaStoreManager for all mutations made by this baseCatalog to
    // only go into an in-memory collection that we can commit as a single atomic unit after all
    // validations.
    TransactionWorkspaceMetaStoreManager transactionMetaStoreManager =
        new TransactionWorkspaceMetaStoreManager(metaStoreManager);
    ((IcebergCatalog) baseCatalog).setMetaStoreManager(transactionMetaStoreManager);

    commitTransactionRequest.tableChanges().stream()
        .forEach(
            change -> {
              Table table = baseCatalog.loadTable(change.identifier());
              if (!(table instanceof BaseTable)) {
                throw new IllegalStateException(
                    "Cannot wrap catalog that does not produce BaseTable");
              }
              if (isCreate(change)) {
                throw new BadRequestException(
                    "Unsupported operation: commitTranaction with updateForStagedCreate: %s",
                    change);
              }

              TableOperations tableOps = ((BaseTable) table).operations();
              TableMetadata currentMetadata = tableOps.current();

              // Validate requirements; any CommitFailedExceptions will fail the overall request
              change.requirements().forEach(requirement -> requirement.validate(currentMetadata));

              // Apply changes
              TableMetadata.Builder metadataBuilder = TableMetadata.buildFrom(currentMetadata);
              change.updates().stream()
                  .forEach(
                      singleUpdate -> {
                        // Note: If location-overlap checking is refactored to be atomic, we could
                        // support validation within a single multi-table transaction as well, but
                        // will need to update the TransactionWorkspaceMetaStoreManager to better
                        // expose the concept of being able to read uncommitted updates.
                        if (singleUpdate instanceof MetadataUpdate.SetLocation) {
                          if (!currentMetadata
                                  .location()
                                  .equals(((MetadataUpdate.SetLocation) singleUpdate).location())
                              && !callContext
                                  .getPolarisCallContext()
                                  .getConfigurationStore()
                                  .getConfiguration(
                                      callContext.getPolarisCallContext(),
                                      FeatureConfiguration.ALLOW_NAMESPACE_LOCATION_OVERLAP)) {
                            throw new BadRequestException(
                                "Unsupported operation: commitTransaction containing SetLocation"
                                    + " for table '%s' and new location '%s'",
                                change.identifier(),
                                ((MetadataUpdate.SetLocation) singleUpdate).location());
                          }
                        }

                        // Apply updates to builder
                        singleUpdate.applyTo(metadataBuilder);
                      });

              // Commit into transaction workspace we swapped the baseCatalog to use
              TableMetadata updatedMetadata = metadataBuilder.build();
              if (!updatedMetadata.changes().isEmpty()) {
                tableOps.commit(currentMetadata, updatedMetadata);
              }
            });

    // Commit the collected updates in a single atomic operation
    List<EntityWithPath> pendingUpdates = transactionMetaStoreManager.getPendingUpdates();
    EntitiesResult result =
        metaStoreManager.updateEntitiesPropertiesIfNotChanged(
            callContext.getPolarisCallContext(), pendingUpdates);
    if (!result.isSuccess()) {
      // TODO: Retries and server-side cleanup on failure
      throw new CommitFailedException(
          "Transaction commit failed with status: %s, extraInfo: %s",
          result.getReturnStatus(), result.getExtraInformation());
    }
  }