public void doCommit()

in service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java [1359:1518]


    public void doCommit(TableMetadata base, TableMetadata metadata) {
      polarisEventListener.onBeforeTableCommited(
          new BeforeTableCommitedEvent(tableIdentifier, base, metadata));

      LOGGER.debug(
          "doCommit for table {} with base {}, metadata {}", tableIdentifier, base, metadata);
      // TODO: Maybe avoid writing metadata if there's definitely a transaction conflict
      if (null == base && !namespaceExists(tableIdentifier.namespace())) {
        throw new NoSuchNamespaceException(
            "Cannot create table '%s'. Namespace does not exist: '%s'",
            tableIdentifier, tableIdentifier.namespace());
      }

      PolarisResolvedPathWrapper resolvedTableEntities =
          resolvedEntityView.getPassthroughResolvedPath(
              tableIdentifier, PolarisEntityType.TABLE_LIKE, PolarisEntitySubType.ICEBERG_TABLE);

      // Fetch credentials for the resolved entity. The entity could be the table itself (if it has
      // already been stored and credentials have been configured directly) or it could be the
      // table's namespace or catalog.
      PolarisResolvedPathWrapper resolvedStorageEntity =
          resolvedTableEntities == null
              ? resolvedEntityView.getResolvedPath(tableIdentifier.namespace())
              : resolvedTableEntities;

      // refresh credentials because we need to read the metadata file to validate its location
      tableFileIO =
          loadFileIOForTableLike(
              tableIdentifier,
              getLocationsAllowedToBeAccessed(metadata),
              resolvedStorageEntity,
              new HashMap<>(metadata.properties()),
              Set.of(PolarisStorageActions.READ, PolarisStorageActions.WRITE));

      List<PolarisEntity> resolvedNamespace =
          resolvedTableEntities == null
              ? resolvedEntityView.getResolvedPath(tableIdentifier.namespace()).getRawFullPath()
              : resolvedTableEntities.getRawParentPath();
      CatalogEntity catalog = CatalogEntity.of(resolvedNamespace.getFirst());

      if (base == null
          || !metadata.location().equals(base.location())
          || !Objects.equal(
              base.properties().get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY),
              metadata
                  .properties()
                  .get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY))) {
        // If location is changing then we must validate that the requested location is valid
        // for the storage configuration inherited under this entity's path.
        Set<String> dataLocations = new HashSet<>();
        dataLocations.add(metadata.location());
        if (metadata.properties().get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY)
            != null) {
          dataLocations.add(
              metadata
                  .properties()
                  .get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY));
        }
        if (metadata
                .properties()
                .get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY)
            != null) {
          dataLocations.add(
              metadata
                  .properties()
                  .get(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY));
        }
        validateLocationsForTableLike(tableIdentifier, dataLocations, resolvedStorageEntity);
        // also validate that the table location doesn't overlap an existing table
        dataLocations.forEach(
            location ->
                validateNoLocationOverlap(
                    catalogEntity,
                    tableIdentifier,
                    resolvedNamespace,
                    location,
                    resolvedStorageEntity.getRawLeafEntity()));
        // and that the metadata file points to a location within the table's directory structure
        if (metadata.metadataFileLocation() != null) {
          validateMetadataFileInTableDir(tableIdentifier, metadata, catalog);
        }
      }

      String newLocation = writeNewMetadataIfRequired(base == null, metadata);
      String oldLocation = base == null ? null : base.metadataFileLocation();

      // TODO: Consider using the entity from doRefresh() directly to do the conflict detection
      // instead of a two-layer CAS (checking metadataLocation to detect concurrent modification
      // between doRefresh() and doCommit(), and then updateEntityPropertiesIfNotChanged to detect
      // concurrent
      // modification between our checking of unchanged metadataLocation here and actual
      // persistence-layer commit).
      PolarisResolvedPathWrapper resolvedPath =
          resolvedEntityView.getPassthroughResolvedPath(
              tableIdentifier, PolarisEntityType.TABLE_LIKE, PolarisEntitySubType.ANY_SUBTYPE);
      if (resolvedPath != null && resolvedPath.getRawLeafEntity() != null) {
        if (resolvedPath.getRawLeafEntity().getSubType() == PolarisEntitySubType.ICEBERG_VIEW) {
          throw new AlreadyExistsException(
              "View with same name already exists: %s", tableIdentifier);
        } else if (resolvedPath.getRawLeafEntity().getSubType()
            == PolarisEntitySubType.GENERIC_TABLE) {
          throw new AlreadyExistsException(
              "Generic table with same name already exists: %s", tableIdentifier);
        }
      }
      IcebergTableLikeEntity entity =
          IcebergTableLikeEntity.of(resolvedPath == null ? null : resolvedPath.getRawLeafEntity());
      String existingLocation;
      if (null == entity) {
        existingLocation = null;
        entity =
            new IcebergTableLikeEntity.Builder(tableIdentifier, newLocation)
                .setCatalogId(getCatalogId())
                .setSubType(PolarisEntitySubType.ICEBERG_TABLE)
                .setBaseLocation(metadata.location())
                .setId(
                    getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId())
                .build();
      } else {
        existingLocation = entity.getMetadataLocation();
        entity =
            new IcebergTableLikeEntity.Builder(entity)
                .setBaseLocation(metadata.location())
                .setMetadataLocation(newLocation)
                .build();
      }
      if (!Objects.equal(existingLocation, oldLocation)) {
        if (null == base) {
          throw new AlreadyExistsException("Table already exists: %s", fullTableName);
        }

        if (null == existingLocation) {
          throw new NoSuchTableException("Table does not exist: %s", fullTableName);
        }

        throw new CommitFailedException(
            "Cannot commit to table %s metadata location from %s to %s "
                + "because it has been concurrently modified to %s",
            tableIdentifier, oldLocation, newLocation, existingLocation);
      }

      // We diverge from `BaseMetastoreTableOperations` in the below code block
      if (makeMetadataCurrentOnCommit) {
        currentMetadata =
            TableMetadata.buildFrom(metadata)
                .withMetadataLocation(newLocation)
                .discardChanges()
                .build();
        currentMetadataLocation = newLocation;
      }

      if (null == existingLocation) {
        createTableLike(tableIdentifier, entity);
      } else {
        updateTableLike(tableIdentifier, entity);
      }

      polarisEventListener.onAfterTableCommited(
          new AfterTableCommitedEvent(tableIdentifier, base, metadata));
    }