private boolean sendNotificationForTableLike()

in service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java [2304:2445]


  private boolean sendNotificationForTableLike(
      PolarisEntitySubType subType, TableIdentifier tableIdentifier, NotificationRequest request) {
    LOGGER.debug(
        "Handling notification request {} for tableIdentifier {}", request, tableIdentifier);
    PolarisResolvedPathWrapper resolvedEntities =
        resolvedEntityView.getPassthroughResolvedPath(
            tableIdentifier, PolarisEntityType.TABLE_LIKE, subType);

    NotificationType notificationType = request.getNotificationType();

    Preconditions.checkNotNull(notificationType, "Expected a valid notification type.");

    if (notificationType == NotificationType.DROP) {
      return dropTableLike(
              PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier, Map.of(), false /* purge */)
          .isSuccess();
    } else if (notificationType == NotificationType.VALIDATE) {
      // In this mode we don't want to make any mutations, so we won't auto-create non-existing
      // parent namespaces. This means when we want to validate allowedLocations for the proposed
      // table metadata location, we must independently find the deepest non-null parent namespace
      // of the TableIdentifier, which may even be the base CatalogEntity if no parent namespaces
      // actually exist yet. We can then extract the right StorageInfo entity via a normal call
      // to findStorageInfoFromHierarchy.
      PolarisResolvedPathWrapper resolvedStorageEntity = null;
      Optional<PolarisEntity> storageInfoEntity = Optional.empty();
      for (int i = tableIdentifier.namespace().length(); i >= 0; i--) {
        Namespace nsLevel =
            Namespace.of(
                Arrays.stream(tableIdentifier.namespace().levels())
                    .limit(i)
                    .toArray(String[]::new));
        resolvedStorageEntity = resolvedEntityView.getResolvedPath(nsLevel);
        if (resolvedStorageEntity != null) {
          storageInfoEntity = FileIOUtil.findStorageInfoFromHierarchy(resolvedStorageEntity);
          break;
        }
      }

      if (resolvedStorageEntity == null || storageInfoEntity.isEmpty()) {
        throw new BadRequestException(
            "Failed to find StorageInfo entity for TableIdentifier %s", tableIdentifier);
      }

      // Validate location against the resolvedStorageEntity
      String metadataLocation =
          transformTableLikeLocation(request.getPayload().getMetadataLocation());
      validateLocationForTableLike(tableIdentifier, metadataLocation, resolvedStorageEntity);

      // Validate that we can construct a FileIO
      String locationDir = metadataLocation.substring(0, metadataLocation.lastIndexOf("/"));
      loadFileIOForTableLike(
          tableIdentifier,
          Set.of(locationDir),
          resolvedStorageEntity,
          new HashMap<>(tableDefaultProperties),
          Set.of(PolarisStorageActions.READ));

      LOGGER.debug(
          "Successful VALIDATE notification for tableIdentifier {}, metadataLocation {}",
          tableIdentifier,
          metadataLocation);
    } else if (notificationType == NotificationType.CREATE
        || notificationType == NotificationType.UPDATE) {

      Namespace ns = tableIdentifier.namespace();
      createNonExistingNamespaces(ns);

      PolarisResolvedPathWrapper resolvedParent = resolvedEntityView.getPassthroughResolvedPath(ns);

      IcebergTableLikeEntity entity =
          IcebergTableLikeEntity.of(
              resolvedEntities == null ? null : resolvedEntities.getRawLeafEntity());

      String existingLocation;
      String newLocation = transformTableLikeLocation(request.getPayload().getMetadataLocation());
      if (null == entity) {
        existingLocation = null;
        entity =
            new IcebergTableLikeEntity.Builder(tableIdentifier, newLocation)
                .setCatalogId(getCatalogId())
                .setSubType(PolarisEntitySubType.ICEBERG_TABLE)
                .setId(
                    getMetaStoreManager().generateNewEntityId(getCurrentPolarisContext()).getId())
                .setLastNotificationTimestamp(request.getPayload().getTimestamp())
                .build();
      } else {
        // If the notification timestamp is out-of-order, we should not update the table
        if (entity.getLastAdmittedNotificationTimestamp().isPresent()
            && request.getPayload().getTimestamp()
                <= entity.getLastAdmittedNotificationTimestamp().get()) {
          throw new AlreadyExistsException(
              "A notification with a newer timestamp has been processed for table %s",
              tableIdentifier);
        }
        existingLocation = entity.getMetadataLocation();
        entity =
            new IcebergTableLikeEntity.Builder(entity)
                .setMetadataLocation(newLocation)
                .setLastNotificationTimestamp(request.getPayload().getTimestamp())
                .build();
      }
      // first validate we can read the metadata file
      validateLocationForTableLike(tableIdentifier, newLocation);

      String locationDir = newLocation.substring(0, newLocation.lastIndexOf("/"));

      FileIO fileIO =
          loadFileIOForTableLike(
              tableIdentifier,
              Set.of(locationDir),
              resolvedParent,
              new HashMap<>(tableDefaultProperties),
              Set.of(PolarisStorageActions.READ, PolarisStorageActions.WRITE));
      TableMetadata tableMetadata = TableMetadataParser.read(fileIO, newLocation);

      // then validate that it points to a valid location for this table
      validateLocationForTableLike(tableIdentifier, tableMetadata.location());

      // finally, validate that the metadata file is within the table directory
      validateMetadataFileInTableDir(
          tableIdentifier,
          tableMetadata,
          CatalogEntity.of(resolvedParent.getRawFullPath().getFirst()));

      // TODO: These might fail due to concurrent update; we need to do a retry in those cases.
      if (null == existingLocation) {
        LOGGER.debug(
            "Creating table {} for notification with metadataLocation {}",
            tableIdentifier,
            newLocation);
        createTableLike(tableIdentifier, entity, resolvedParent);
      } else {
        LOGGER.debug(
            "Updating table {} for notification with metadataLocation {}",
            tableIdentifier,
            newLocation);

        updateTableLike(tableIdentifier, entity);
      }
    }
    return true;
  }