flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java [93:804]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class FlinkCatalog extends AbstractCatalog {

  private final CatalogLoader catalogLoader;
  private final Catalog icebergCatalog;
  private final Namespace baseNamespace;
  private final SupportsNamespaces asNamespaceCatalog;
  private final Closeable closeable;
  private final boolean cacheEnabled;

  public FlinkCatalog(
      String catalogName,
      String defaultDatabase,
      Namespace baseNamespace,
      CatalogLoader catalogLoader,
      boolean cacheEnabled,
      long cacheExpirationIntervalMs) {
    super(catalogName, defaultDatabase);
    this.catalogLoader = catalogLoader;
    this.baseNamespace = baseNamespace;
    this.cacheEnabled = cacheEnabled;

    Catalog originalCatalog = catalogLoader.loadCatalog();
    icebergCatalog =
        cacheEnabled
            ? CachingCatalog.wrap(originalCatalog, cacheExpirationIntervalMs)
            : originalCatalog;
    asNamespaceCatalog =
        originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
    closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;

    FlinkEnvironmentContext.init();
  }

  @Override
  public void open() throws CatalogException {}

  @Override
  public void close() throws CatalogException {
    if (closeable != null) {
      try {
        closeable.close();
      } catch (IOException e) {
        throw new CatalogException(e);
      }
    }
  }

  public Catalog catalog() {
    return icebergCatalog;
  }

  /** Append a new level to the base namespace */
  private static Namespace appendLevel(Namespace baseNamespace, String newLevel) {
    String[] namespace = new String[baseNamespace.levels().length + 1];
    System.arraycopy(baseNamespace.levels(), 0, namespace, 0, baseNamespace.levels().length);
    namespace[baseNamespace.levels().length] = newLevel;
    return Namespace.of(namespace);
  }

  TableIdentifier toIdentifier(ObjectPath path) {
    String objectName = path.getObjectName();
    List<String> tableName = Splitter.on('$').splitToList(objectName);

    if (tableName.size() == 1) {
      return TableIdentifier.of(
          appendLevel(baseNamespace, path.getDatabaseName()), path.getObjectName());
    } else if (tableName.size() == 2 && MetadataTableType.from(tableName.get(1)) != null) {
      return TableIdentifier.of(
          appendLevel(appendLevel(baseNamespace, path.getDatabaseName()), tableName.get(0)),
          tableName.get(1));
    } else {
      throw new IllegalArgumentException("Illegal table name:" + objectName);
    }
  }

  @Override
  public List<String> listDatabases() throws CatalogException {
    if (asNamespaceCatalog == null) {
      return Collections.singletonList(getDefaultDatabase());
    }

    return asNamespaceCatalog.listNamespaces(baseNamespace).stream()
        .map(n -> n.level(n.levels().length - 1))
        .collect(Collectors.toList());
  }

  @Override
  public CatalogDatabase getDatabase(String databaseName)
      throws DatabaseNotExistException, CatalogException {
    if (asNamespaceCatalog == null) {
      if (!getDefaultDatabase().equals(databaseName)) {
        throw new DatabaseNotExistException(getName(), databaseName);
      } else {
        return new CatalogDatabaseImpl(Maps.newHashMap(), "");
      }
    } else {
      try {
        Map<String, String> metadata =
            Maps.newHashMap(
                asNamespaceCatalog.loadNamespaceMetadata(appendLevel(baseNamespace, databaseName)));
        String comment = metadata.remove("comment");
        return new CatalogDatabaseImpl(metadata, comment);
      } catch (NoSuchNamespaceException e) {
        throw new DatabaseNotExistException(getName(), databaseName, e);
      }
    }
  }

  @Override
  public boolean databaseExists(String databaseName) throws CatalogException {
    try {
      getDatabase(databaseName);
      return true;
    } catch (DatabaseNotExistException ignore) {
      return false;
    }
  }

  @Override
  public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
      throws DatabaseAlreadyExistException, CatalogException {
    createDatabase(
        name, mergeComment(database.getProperties(), database.getComment()), ignoreIfExists);
  }

  private void createDatabase(
      String databaseName, Map<String, String> metadata, boolean ignoreIfExists)
      throws DatabaseAlreadyExistException, CatalogException {
    if (asNamespaceCatalog != null) {
      try {
        asNamespaceCatalog.createNamespace(appendLevel(baseNamespace, databaseName), metadata);
      } catch (AlreadyExistsException e) {
        if (!ignoreIfExists) {
          throw new DatabaseAlreadyExistException(getName(), databaseName, e);
        }
      }
    } else {
      throw new UnsupportedOperationException(
          "Namespaces are not supported by catalog: " + getName());
    }
  }

  private Map<String, String> mergeComment(Map<String, String> metadata, String comment) {
    Map<String, String> ret = Maps.newHashMap(metadata);
    if (metadata.containsKey("comment")) {
      throw new CatalogException("Database properties should not contain key: 'comment'.");
    }

    if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
      ret.put("comment", comment);
    }
    return ret;
  }

  @Override
  public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
      throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
    if (asNamespaceCatalog != null) {
      try {
        boolean success = asNamespaceCatalog.dropNamespace(appendLevel(baseNamespace, name));
        if (!success && !ignoreIfNotExists) {
          throw new DatabaseNotExistException(getName(), name);
        }
      } catch (NoSuchNamespaceException e) {
        if (!ignoreIfNotExists) {
          throw new DatabaseNotExistException(getName(), name, e);
        }
      } catch (NamespaceNotEmptyException e) {
        throw new DatabaseNotEmptyException(getName(), name, e);
      }
    } else {
      if (!ignoreIfNotExists) {
        throw new DatabaseNotExistException(getName(), name);
      }
    }
  }

  @Override
  public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
      throws DatabaseNotExistException, CatalogException {
    if (asNamespaceCatalog != null) {
      Namespace namespace = appendLevel(baseNamespace, name);
      Map<String, String> updates = Maps.newHashMap();
      Set<String> removals = Sets.newHashSet();

      try {
        Map<String, String> oldProperties = asNamespaceCatalog.loadNamespaceMetadata(namespace);
        Map<String, String> newProperties =
            mergeComment(newDatabase.getProperties(), newDatabase.getComment());

        for (String key : oldProperties.keySet()) {
          if (!newProperties.containsKey(key)) {
            removals.add(key);
          }
        }

        for (Map.Entry<String, String> entry : newProperties.entrySet()) {
          if (!entry.getValue().equals(oldProperties.get(entry.getKey()))) {
            updates.put(entry.getKey(), entry.getValue());
          }
        }

        if (!updates.isEmpty()) {
          asNamespaceCatalog.setProperties(namespace, updates);
        }

        if (!removals.isEmpty()) {
          asNamespaceCatalog.removeProperties(namespace, removals);
        }

      } catch (NoSuchNamespaceException e) {
        if (!ignoreIfNotExists) {
          throw new DatabaseNotExistException(getName(), name, e);
        }
      }
    } else {
      if (getDefaultDatabase().equals(name)) {
        throw new CatalogException(
            "Can not alter the default database when the iceberg catalog doesn't support namespaces.");
      }
      if (!ignoreIfNotExists) {
        throw new DatabaseNotExistException(getName(), name);
      }
    }
  }

  @Override
  public List<String> listTables(String databaseName)
      throws DatabaseNotExistException, CatalogException {
    try {
      return icebergCatalog.listTables(appendLevel(baseNamespace, databaseName)).stream()
          .map(TableIdentifier::name)
          .collect(Collectors.toList());
    } catch (NoSuchNamespaceException e) {
      throw new DatabaseNotExistException(getName(), databaseName, e);
    }
  }

  @Override
  public CatalogTable getTable(ObjectPath tablePath)
      throws TableNotExistException, CatalogException {
    Table table = loadIcebergTable(tablePath);
    return toCatalogTable(table);
  }

  private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
    try {
      Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
      if (cacheEnabled) {
        table.refresh();
      }

      return table;
    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
      throw new TableNotExistException(getName(), tablePath, e);
    }
  }

  @Override
  public boolean tableExists(ObjectPath tablePath) throws CatalogException {
    return icebergCatalog.tableExists(toIdentifier(tablePath));
  }

  @Override
  public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
      throws TableNotExistException, CatalogException {
    try {
      icebergCatalog.dropTable(toIdentifier(tablePath));
    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
      if (!ignoreIfNotExists) {
        throw new TableNotExistException(getName(), tablePath, e);
      }
    }
  }

  @Override
  public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
      throws TableNotExistException, TableAlreadyExistException, CatalogException {
    try {
      icebergCatalog.renameTable(
          toIdentifier(tablePath),
          toIdentifier(new ObjectPath(tablePath.getDatabaseName(), newTableName)));
    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
      if (!ignoreIfNotExists) {
        throw new TableNotExistException(getName(), tablePath, e);
      }
    } catch (AlreadyExistsException e) {
      throw new TableAlreadyExistException(getName(), tablePath, e);
    }
  }

  @Override
  public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
      throws CatalogException, TableAlreadyExistException {
    if (Objects.equals(
        table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
      throw new IllegalArgumentException(
          "Cannot create the table with 'connector'='iceberg' table property in "
              + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
              + "create table without 'connector'='iceberg' related properties in an iceberg table.");
    }

    createIcebergTable(tablePath, table, ignoreIfExists);
  }

  void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
      throws CatalogException, TableAlreadyExistException {
    validateFlinkTable(table);

    Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
    PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);

    ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
    String location = null;
    for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
      if ("location".equalsIgnoreCase(entry.getKey())) {
        location = entry.getValue();
      } else {
        properties.put(entry.getKey(), entry.getValue());
      }
    }

    try {
      icebergCatalog.createTable(
          toIdentifier(tablePath), icebergSchema, spec, location, properties.build());
    } catch (AlreadyExistsException e) {
      if (!ignoreIfExists) {
        throw new TableAlreadyExistException(getName(), tablePath, e);
      }
    }
  }

  private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTable ct2) {
    TableSchema ts1 = ct1.getSchema();
    TableSchema ts2 = ct2.getSchema();
    boolean equalsPrimary = false;

    if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) {
      equalsPrimary =
          Objects.equals(ts1.getPrimaryKey().get().getType(), ts2.getPrimaryKey().get().getType())
              && Objects.equals(
                  ts1.getPrimaryKey().get().getColumns(), ts2.getPrimaryKey().get().getColumns());
    } else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) {
      equalsPrimary = true;
    }

    if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
        && Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs())
        && equalsPrimary)) {
      throw new UnsupportedOperationException("Altering schema is not supported yet.");
    }

    if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) {
      throw new UnsupportedOperationException("Altering partition keys is not supported yet.");
    }
  }

  @Override
  public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
      throws CatalogException, TableNotExistException {
    validateFlinkTable(newTable);

    Table icebergTable;
    try {
      icebergTable = loadIcebergTable(tablePath);
    } catch (TableNotExistException e) {
      if (!ignoreIfNotExists) {
        throw e;
      } else {
        return;
      }
    }

    CatalogTable table = toCatalogTable(icebergTable);

    // Currently, Flink SQL only support altering table properties.

    // For current Flink Catalog API, support for adding/removing/renaming columns cannot be done by
    // comparing
    // CatalogTable instances, unless the Flink schema contains Iceberg column IDs.
    validateTableSchemaAndPartition(table, (CatalogTable) newTable);

    Map<String, String> oldProperties = table.getOptions();
    Map<String, String> setProperties = Maps.newHashMap();

    String setLocation = null;
    String setSnapshotId = null;
    String pickSnapshotId = null;

    for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
      String key = entry.getKey();
      String value = entry.getValue();

      if (Objects.equals(value, oldProperties.get(key))) {
        continue;
      }

      if ("location".equalsIgnoreCase(key)) {
        setLocation = value;
      } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
        setSnapshotId = value;
      } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
        pickSnapshotId = value;
      } else {
        setProperties.put(key, value);
      }
    }

    oldProperties
        .keySet()
        .forEach(
            k -> {
              if (!newTable.getOptions().containsKey(k)) {
                setProperties.put(k, null);
              }
            });

    commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties);
  }

  private static void validateFlinkTable(CatalogBaseTable table) {
    Preconditions.checkArgument(
        table instanceof CatalogTable, "The Table should be a CatalogTable.");

    TableSchema schema = table.getSchema();
    schema
        .getTableColumns()
        .forEach(
            column -> {
              if (!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
                throw new UnsupportedOperationException(
                    "Creating table with computed columns is not supported yet.");
              }
            });

    if (!schema.getWatermarkSpecs().isEmpty()) {
      throw new UnsupportedOperationException(
          "Creating table with watermark specs is not supported yet.");
    }
  }

  private static PartitionSpec toPartitionSpec(List<String> partitionKeys, Schema icebergSchema) {
    PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
    partitionKeys.forEach(builder::identity);
    return builder.build();
  }

  private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSchema) {
    ImmutableList.Builder<String> partitionKeysBuilder = ImmutableList.builder();
    for (PartitionField field : spec.fields()) {
      if (field.transform().isIdentity()) {
        partitionKeysBuilder.add(icebergSchema.findColumnName(field.sourceId()));
      } else {
        // Not created by Flink SQL.
        // For compatibility with iceberg tables, return empty.
        // TODO modify this after Flink support partition transform.
        return Collections.emptyList();
      }
    }
    return partitionKeysBuilder.build();
  }

  private static void commitChanges(
      Table table,
      String setLocation,
      String setSnapshotId,
      String pickSnapshotId,
      Map<String, String> setProperties) {
    // don't allow setting the snapshot and picking a commit at the same time because order is
    // ambiguous and choosing
    // one order leads to different results
    Preconditions.checkArgument(
        setSnapshotId == null || pickSnapshotId == null,
        "Cannot set the current snapshot ID and cherry-pick snapshot changes");

    if (setSnapshotId != null) {
      long newSnapshotId = Long.parseLong(setSnapshotId);
      table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
    }

    // if updating the table snapshot, perform that update first in case it fails
    if (pickSnapshotId != null) {
      long newSnapshotId = Long.parseLong(pickSnapshotId);
      table.manageSnapshots().cherrypick(newSnapshotId).commit();
    }

    Transaction transaction = table.newTransaction();

    if (setLocation != null) {
      transaction.updateLocation().setLocation(setLocation).commit();
    }

    if (!setProperties.isEmpty()) {
      UpdateProperties updateProperties = transaction.updateProperties();
      setProperties.forEach(
          (k, v) -> {
            if (v == null) {
              updateProperties.remove(k);
            } else {
              updateProperties.set(k, v);
            }
          });
      updateProperties.commit();
    }

    transaction.commitTransaction();
  }

  static CatalogTable toCatalogTable(Table table) {
    TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
    List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());

    // NOTE: We can not create a IcebergCatalogTable extends CatalogTable, because Flink optimizer
    // may use
    // CatalogTableImpl to copy a new catalog table.
    // Let's re-loading table from Iceberg catalog when creating source/sink operators.
    // Iceberg does not have Table comment, so pass a null (Default comment value in Flink).
    return new CatalogTableImpl(schema, partitionKeys, table.properties(), null);
  }

  @Override
  public Optional<Factory> getFactory() {
    return Optional.of(new FlinkDynamicTableFactory(this));
  }

  CatalogLoader getCatalogLoader() {
    return catalogLoader;
  }

  // ------------------------------ Unsupported methods
  // ---------------------------------------------

  @Override
  public List<String> listViews(String databaseName) throws CatalogException {
    return Collections.emptyList();
  }

  @Override
  public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void createPartition(
      ObjectPath tablePath,
      CatalogPartitionSpec partitionSpec,
      CatalogPartition partition,
      boolean ignoreIfExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void dropPartition(
      ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void alterPartition(
      ObjectPath tablePath,
      CatalogPartitionSpec partitionSpec,
      CatalogPartition newPartition,
      boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public List<String> listFunctions(String dbName) throws CatalogException {
    return Collections.emptyList();
  }

  @Override
  public CatalogFunction getFunction(ObjectPath functionPath)
      throws FunctionNotExistException, CatalogException {
    throw new FunctionNotExistException(getName(), functionPath);
  }

  @Override
  public boolean functionExists(ObjectPath functionPath) throws CatalogException {
    return false;
  }

  @Override
  public void createFunction(
      ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void alterFunction(
      ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void alterTableStatistics(
      ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void alterTableColumnStatistics(
      ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void alterPartitionStatistics(
      ObjectPath tablePath,
      CatalogPartitionSpec partitionSpec,
      CatalogTableStatistics partitionStatistics,
      boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void alterPartitionColumnStatistics(
      ObjectPath tablePath,
      CatalogPartitionSpec partitionSpec,
      CatalogColumnStatistics columnStatistics,
      boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
      throws TableNotExistException, TableNotPartitionedException, CatalogException {
    Table table = loadIcebergTable(tablePath);

    if (table.spec().isUnpartitioned()) {
      throw new TableNotPartitionedException(icebergCatalog.name(), tablePath);
    }

    Set<CatalogPartitionSpec> set = Sets.newHashSet();
    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
      for (DataFile dataFile : CloseableIterable.transform(tasks, FileScanTask::file)) {
        Map<String, String> map = Maps.newHashMap();
        StructLike structLike = dataFile.partition();
        PartitionSpec spec = table.specs().get(dataFile.specId());
        for (int i = 0; i < structLike.size(); i++) {
          map.put(spec.fields().get(i).name(), String.valueOf(structLike.get(i, Object.class)));
        }
        set.add(new CatalogPartitionSpec(map));
      }
    } catch (IOException e) {
      throw new CatalogException(
          String.format("Failed to list partitions of table %s", tablePath), e);
    }

    return Lists.newArrayList(set);
  }

  @Override
  public List<CatalogPartitionSpec> listPartitions(
      ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public List<CatalogPartitionSpec> listPartitionsByFilter(
      ObjectPath tablePath, List<Expression> filters) throws CatalogException {
    throw new UnsupportedOperationException();
  }

  // After partition pruning and filter push down, the statistics have become very inaccurate, so
  // the statistics from
  // here are of little significance.
  // Flink will support something like SupportsReportStatistics in future.

  @Override
  public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws CatalogException {
    return CatalogTableStatistics.UNKNOWN;
  }

  @Override
  public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
      throws CatalogException {
    return CatalogColumnStatistics.UNKNOWN;
  }

  @Override
  public CatalogTableStatistics getPartitionStatistics(
      ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
    return CatalogTableStatistics.UNKNOWN;
  }

  @Override
  public CatalogColumnStatistics getPartitionColumnStatistics(
      ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
    return CatalogColumnStatistics.UNKNOWN;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java [93:804]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class FlinkCatalog extends AbstractCatalog {

  private final CatalogLoader catalogLoader;
  private final Catalog icebergCatalog;
  private final Namespace baseNamespace;
  private final SupportsNamespaces asNamespaceCatalog;
  private final Closeable closeable;
  private final boolean cacheEnabled;

  public FlinkCatalog(
      String catalogName,
      String defaultDatabase,
      Namespace baseNamespace,
      CatalogLoader catalogLoader,
      boolean cacheEnabled,
      long cacheExpirationIntervalMs) {
    super(catalogName, defaultDatabase);
    this.catalogLoader = catalogLoader;
    this.baseNamespace = baseNamespace;
    this.cacheEnabled = cacheEnabled;

    Catalog originalCatalog = catalogLoader.loadCatalog();
    icebergCatalog =
        cacheEnabled
            ? CachingCatalog.wrap(originalCatalog, cacheExpirationIntervalMs)
            : originalCatalog;
    asNamespaceCatalog =
        originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
    closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;

    FlinkEnvironmentContext.init();
  }

  @Override
  public void open() throws CatalogException {}

  @Override
  public void close() throws CatalogException {
    if (closeable != null) {
      try {
        closeable.close();
      } catch (IOException e) {
        throw new CatalogException(e);
      }
    }
  }

  public Catalog catalog() {
    return icebergCatalog;
  }

  /** Append a new level to the base namespace */
  private static Namespace appendLevel(Namespace baseNamespace, String newLevel) {
    String[] namespace = new String[baseNamespace.levels().length + 1];
    System.arraycopy(baseNamespace.levels(), 0, namespace, 0, baseNamespace.levels().length);
    namespace[baseNamespace.levels().length] = newLevel;
    return Namespace.of(namespace);
  }

  TableIdentifier toIdentifier(ObjectPath path) {
    String objectName = path.getObjectName();
    List<String> tableName = Splitter.on('$').splitToList(objectName);

    if (tableName.size() == 1) {
      return TableIdentifier.of(
          appendLevel(baseNamespace, path.getDatabaseName()), path.getObjectName());
    } else if (tableName.size() == 2 && MetadataTableType.from(tableName.get(1)) != null) {
      return TableIdentifier.of(
          appendLevel(appendLevel(baseNamespace, path.getDatabaseName()), tableName.get(0)),
          tableName.get(1));
    } else {
      throw new IllegalArgumentException("Illegal table name:" + objectName);
    }
  }

  @Override
  public List<String> listDatabases() throws CatalogException {
    if (asNamespaceCatalog == null) {
      return Collections.singletonList(getDefaultDatabase());
    }

    return asNamespaceCatalog.listNamespaces(baseNamespace).stream()
        .map(n -> n.level(n.levels().length - 1))
        .collect(Collectors.toList());
  }

  @Override
  public CatalogDatabase getDatabase(String databaseName)
      throws DatabaseNotExistException, CatalogException {
    if (asNamespaceCatalog == null) {
      if (!getDefaultDatabase().equals(databaseName)) {
        throw new DatabaseNotExistException(getName(), databaseName);
      } else {
        return new CatalogDatabaseImpl(Maps.newHashMap(), "");
      }
    } else {
      try {
        Map<String, String> metadata =
            Maps.newHashMap(
                asNamespaceCatalog.loadNamespaceMetadata(appendLevel(baseNamespace, databaseName)));
        String comment = metadata.remove("comment");
        return new CatalogDatabaseImpl(metadata, comment);
      } catch (NoSuchNamespaceException e) {
        throw new DatabaseNotExistException(getName(), databaseName, e);
      }
    }
  }

  @Override
  public boolean databaseExists(String databaseName) throws CatalogException {
    try {
      getDatabase(databaseName);
      return true;
    } catch (DatabaseNotExistException ignore) {
      return false;
    }
  }

  @Override
  public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
      throws DatabaseAlreadyExistException, CatalogException {
    createDatabase(
        name, mergeComment(database.getProperties(), database.getComment()), ignoreIfExists);
  }

  private void createDatabase(
      String databaseName, Map<String, String> metadata, boolean ignoreIfExists)
      throws DatabaseAlreadyExistException, CatalogException {
    if (asNamespaceCatalog != null) {
      try {
        asNamespaceCatalog.createNamespace(appendLevel(baseNamespace, databaseName), metadata);
      } catch (AlreadyExistsException e) {
        if (!ignoreIfExists) {
          throw new DatabaseAlreadyExistException(getName(), databaseName, e);
        }
      }
    } else {
      throw new UnsupportedOperationException(
          "Namespaces are not supported by catalog: " + getName());
    }
  }

  private Map<String, String> mergeComment(Map<String, String> metadata, String comment) {
    Map<String, String> ret = Maps.newHashMap(metadata);
    if (metadata.containsKey("comment")) {
      throw new CatalogException("Database properties should not contain key: 'comment'.");
    }

    if (!StringUtils.isNullOrWhitespaceOnly(comment)) {
      ret.put("comment", comment);
    }
    return ret;
  }

  @Override
  public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
      throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
    if (asNamespaceCatalog != null) {
      try {
        boolean success = asNamespaceCatalog.dropNamespace(appendLevel(baseNamespace, name));
        if (!success && !ignoreIfNotExists) {
          throw new DatabaseNotExistException(getName(), name);
        }
      } catch (NoSuchNamespaceException e) {
        if (!ignoreIfNotExists) {
          throw new DatabaseNotExistException(getName(), name, e);
        }
      } catch (NamespaceNotEmptyException e) {
        throw new DatabaseNotEmptyException(getName(), name, e);
      }
    } else {
      if (!ignoreIfNotExists) {
        throw new DatabaseNotExistException(getName(), name);
      }
    }
  }

  @Override
  public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists)
      throws DatabaseNotExistException, CatalogException {
    if (asNamespaceCatalog != null) {
      Namespace namespace = appendLevel(baseNamespace, name);
      Map<String, String> updates = Maps.newHashMap();
      Set<String> removals = Sets.newHashSet();

      try {
        Map<String, String> oldProperties = asNamespaceCatalog.loadNamespaceMetadata(namespace);
        Map<String, String> newProperties =
            mergeComment(newDatabase.getProperties(), newDatabase.getComment());

        for (String key : oldProperties.keySet()) {
          if (!newProperties.containsKey(key)) {
            removals.add(key);
          }
        }

        for (Map.Entry<String, String> entry : newProperties.entrySet()) {
          if (!entry.getValue().equals(oldProperties.get(entry.getKey()))) {
            updates.put(entry.getKey(), entry.getValue());
          }
        }

        if (!updates.isEmpty()) {
          asNamespaceCatalog.setProperties(namespace, updates);
        }

        if (!removals.isEmpty()) {
          asNamespaceCatalog.removeProperties(namespace, removals);
        }

      } catch (NoSuchNamespaceException e) {
        if (!ignoreIfNotExists) {
          throw new DatabaseNotExistException(getName(), name, e);
        }
      }
    } else {
      if (getDefaultDatabase().equals(name)) {
        throw new CatalogException(
            "Can not alter the default database when the iceberg catalog doesn't support namespaces.");
      }
      if (!ignoreIfNotExists) {
        throw new DatabaseNotExistException(getName(), name);
      }
    }
  }

  @Override
  public List<String> listTables(String databaseName)
      throws DatabaseNotExistException, CatalogException {
    try {
      return icebergCatalog.listTables(appendLevel(baseNamespace, databaseName)).stream()
          .map(TableIdentifier::name)
          .collect(Collectors.toList());
    } catch (NoSuchNamespaceException e) {
      throw new DatabaseNotExistException(getName(), databaseName, e);
    }
  }

  @Override
  public CatalogTable getTable(ObjectPath tablePath)
      throws TableNotExistException, CatalogException {
    Table table = loadIcebergTable(tablePath);
    return toCatalogTable(table);
  }

  private Table loadIcebergTable(ObjectPath tablePath) throws TableNotExistException {
    try {
      Table table = icebergCatalog.loadTable(toIdentifier(tablePath));
      if (cacheEnabled) {
        table.refresh();
      }

      return table;
    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
      throw new TableNotExistException(getName(), tablePath, e);
    }
  }

  @Override
  public boolean tableExists(ObjectPath tablePath) throws CatalogException {
    return icebergCatalog.tableExists(toIdentifier(tablePath));
  }

  @Override
  public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
      throws TableNotExistException, CatalogException {
    try {
      icebergCatalog.dropTable(toIdentifier(tablePath));
    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
      if (!ignoreIfNotExists) {
        throw new TableNotExistException(getName(), tablePath, e);
      }
    }
  }

  @Override
  public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists)
      throws TableNotExistException, TableAlreadyExistException, CatalogException {
    try {
      icebergCatalog.renameTable(
          toIdentifier(tablePath),
          toIdentifier(new ObjectPath(tablePath.getDatabaseName(), newTableName)));
    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
      if (!ignoreIfNotExists) {
        throw new TableNotExistException(getName(), tablePath, e);
      }
    } catch (AlreadyExistsException e) {
      throw new TableAlreadyExistException(getName(), tablePath, e);
    }
  }

  @Override
  public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
      throws CatalogException, TableAlreadyExistException {
    if (Objects.equals(
        table.getOptions().get("connector"), FlinkDynamicTableFactory.FACTORY_IDENTIFIER)) {
      throw new IllegalArgumentException(
          "Cannot create the table with 'connector'='iceberg' table property in "
              + "an iceberg catalog, Please create table with 'connector'='iceberg' property in a non-iceberg catalog or "
              + "create table without 'connector'='iceberg' related properties in an iceberg table.");
    }

    createIcebergTable(tablePath, table, ignoreIfExists);
  }

  void createIcebergTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
      throws CatalogException, TableAlreadyExistException {
    validateFlinkTable(table);

    Schema icebergSchema = FlinkSchemaUtil.convert(table.getSchema());
    PartitionSpec spec = toPartitionSpec(((CatalogTable) table).getPartitionKeys(), icebergSchema);

    ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
    String location = null;
    for (Map.Entry<String, String> entry : table.getOptions().entrySet()) {
      if ("location".equalsIgnoreCase(entry.getKey())) {
        location = entry.getValue();
      } else {
        properties.put(entry.getKey(), entry.getValue());
      }
    }

    try {
      icebergCatalog.createTable(
          toIdentifier(tablePath), icebergSchema, spec, location, properties.build());
    } catch (AlreadyExistsException e) {
      if (!ignoreIfExists) {
        throw new TableAlreadyExistException(getName(), tablePath, e);
      }
    }
  }

  private static void validateTableSchemaAndPartition(CatalogTable ct1, CatalogTable ct2) {
    TableSchema ts1 = ct1.getSchema();
    TableSchema ts2 = ct2.getSchema();
    boolean equalsPrimary = false;

    if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) {
      equalsPrimary =
          Objects.equals(ts1.getPrimaryKey().get().getType(), ts2.getPrimaryKey().get().getType())
              && Objects.equals(
                  ts1.getPrimaryKey().get().getColumns(), ts2.getPrimaryKey().get().getColumns());
    } else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) {
      equalsPrimary = true;
    }

    if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
        && Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs())
        && equalsPrimary)) {
      throw new UnsupportedOperationException("Altering schema is not supported yet.");
    }

    if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) {
      throw new UnsupportedOperationException("Altering partition keys is not supported yet.");
    }
  }

  @Override
  public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists)
      throws CatalogException, TableNotExistException {
    validateFlinkTable(newTable);

    Table icebergTable;
    try {
      icebergTable = loadIcebergTable(tablePath);
    } catch (TableNotExistException e) {
      if (!ignoreIfNotExists) {
        throw e;
      } else {
        return;
      }
    }

    CatalogTable table = toCatalogTable(icebergTable);

    // Currently, Flink SQL only support altering table properties.

    // For current Flink Catalog API, support for adding/removing/renaming columns cannot be done by
    // comparing
    // CatalogTable instances, unless the Flink schema contains Iceberg column IDs.
    validateTableSchemaAndPartition(table, (CatalogTable) newTable);

    Map<String, String> oldProperties = table.getOptions();
    Map<String, String> setProperties = Maps.newHashMap();

    String setLocation = null;
    String setSnapshotId = null;
    String pickSnapshotId = null;

    for (Map.Entry<String, String> entry : newTable.getOptions().entrySet()) {
      String key = entry.getKey();
      String value = entry.getValue();

      if (Objects.equals(value, oldProperties.get(key))) {
        continue;
      }

      if ("location".equalsIgnoreCase(key)) {
        setLocation = value;
      } else if ("current-snapshot-id".equalsIgnoreCase(key)) {
        setSnapshotId = value;
      } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(key)) {
        pickSnapshotId = value;
      } else {
        setProperties.put(key, value);
      }
    }

    oldProperties
        .keySet()
        .forEach(
            k -> {
              if (!newTable.getOptions().containsKey(k)) {
                setProperties.put(k, null);
              }
            });

    commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties);
  }

  private static void validateFlinkTable(CatalogBaseTable table) {
    Preconditions.checkArgument(
        table instanceof CatalogTable, "The Table should be a CatalogTable.");

    TableSchema schema = table.getSchema();
    schema
        .getTableColumns()
        .forEach(
            column -> {
              if (!FlinkCompatibilityUtil.isPhysicalColumn(column)) {
                throw new UnsupportedOperationException(
                    "Creating table with computed columns is not supported yet.");
              }
            });

    if (!schema.getWatermarkSpecs().isEmpty()) {
      throw new UnsupportedOperationException(
          "Creating table with watermark specs is not supported yet.");
    }
  }

  private static PartitionSpec toPartitionSpec(List<String> partitionKeys, Schema icebergSchema) {
    PartitionSpec.Builder builder = PartitionSpec.builderFor(icebergSchema);
    partitionKeys.forEach(builder::identity);
    return builder.build();
  }

  private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSchema) {
    ImmutableList.Builder<String> partitionKeysBuilder = ImmutableList.builder();
    for (PartitionField field : spec.fields()) {
      if (field.transform().isIdentity()) {
        partitionKeysBuilder.add(icebergSchema.findColumnName(field.sourceId()));
      } else {
        // Not created by Flink SQL.
        // For compatibility with iceberg tables, return empty.
        // TODO modify this after Flink support partition transform.
        return Collections.emptyList();
      }
    }
    return partitionKeysBuilder.build();
  }

  private static void commitChanges(
      Table table,
      String setLocation,
      String setSnapshotId,
      String pickSnapshotId,
      Map<String, String> setProperties) {
    // don't allow setting the snapshot and picking a commit at the same time because order is
    // ambiguous and choosing
    // one order leads to different results
    Preconditions.checkArgument(
        setSnapshotId == null || pickSnapshotId == null,
        "Cannot set the current snapshot ID and cherry-pick snapshot changes");

    if (setSnapshotId != null) {
      long newSnapshotId = Long.parseLong(setSnapshotId);
      table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
    }

    // if updating the table snapshot, perform that update first in case it fails
    if (pickSnapshotId != null) {
      long newSnapshotId = Long.parseLong(pickSnapshotId);
      table.manageSnapshots().cherrypick(newSnapshotId).commit();
    }

    Transaction transaction = table.newTransaction();

    if (setLocation != null) {
      transaction.updateLocation().setLocation(setLocation).commit();
    }

    if (!setProperties.isEmpty()) {
      UpdateProperties updateProperties = transaction.updateProperties();
      setProperties.forEach(
          (k, v) -> {
            if (v == null) {
              updateProperties.remove(k);
            } else {
              updateProperties.set(k, v);
            }
          });
      updateProperties.commit();
    }

    transaction.commitTransaction();
  }

  static CatalogTable toCatalogTable(Table table) {
    TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
    List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());

    // NOTE: We can not create a IcebergCatalogTable extends CatalogTable, because Flink optimizer
    // may use
    // CatalogTableImpl to copy a new catalog table.
    // Let's re-loading table from Iceberg catalog when creating source/sink operators.
    // Iceberg does not have Table comment, so pass a null (Default comment value in Flink).
    return new CatalogTableImpl(schema, partitionKeys, table.properties(), null);
  }

  @Override
  public Optional<Factory> getFactory() {
    return Optional.of(new FlinkDynamicTableFactory(this));
  }

  CatalogLoader getCatalogLoader() {
    return catalogLoader;
  }

  // ------------------------------ Unsupported methods
  // ---------------------------------------------

  @Override
  public List<String> listViews(String databaseName) throws CatalogException {
    return Collections.emptyList();
  }

  @Override
  public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void createPartition(
      ObjectPath tablePath,
      CatalogPartitionSpec partitionSpec,
      CatalogPartition partition,
      boolean ignoreIfExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void dropPartition(
      ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void alterPartition(
      ObjectPath tablePath,
      CatalogPartitionSpec partitionSpec,
      CatalogPartition newPartition,
      boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public List<String> listFunctions(String dbName) throws CatalogException {
    return Collections.emptyList();
  }

  @Override
  public CatalogFunction getFunction(ObjectPath functionPath)
      throws FunctionNotExistException, CatalogException {
    throw new FunctionNotExistException(getName(), functionPath);
  }

  @Override
  public boolean functionExists(ObjectPath functionPath) throws CatalogException {
    return false;
  }

  @Override
  public void createFunction(
      ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void alterFunction(
      ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void alterTableStatistics(
      ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void alterTableColumnStatistics(
      ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void alterPartitionStatistics(
      ObjectPath tablePath,
      CatalogPartitionSpec partitionSpec,
      CatalogTableStatistics partitionStatistics,
      boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public void alterPartitionColumnStatistics(
      ObjectPath tablePath,
      CatalogPartitionSpec partitionSpec,
      CatalogColumnStatistics columnStatistics,
      boolean ignoreIfNotExists)
      throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
      throws TableNotExistException, TableNotPartitionedException, CatalogException {
    Table table = loadIcebergTable(tablePath);

    if (table.spec().isUnpartitioned()) {
      throw new TableNotPartitionedException(icebergCatalog.name(), tablePath);
    }

    Set<CatalogPartitionSpec> set = Sets.newHashSet();
    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
      for (DataFile dataFile : CloseableIterable.transform(tasks, FileScanTask::file)) {
        Map<String, String> map = Maps.newHashMap();
        StructLike structLike = dataFile.partition();
        PartitionSpec spec = table.specs().get(dataFile.specId());
        for (int i = 0; i < structLike.size(); i++) {
          map.put(spec.fields().get(i).name(), String.valueOf(structLike.get(i, Object.class)));
        }
        set.add(new CatalogPartitionSpec(map));
      }
    } catch (IOException e) {
      throw new CatalogException(
          String.format("Failed to list partitions of table %s", tablePath), e);
    }

    return Lists.newArrayList(set);
  }

  @Override
  public List<CatalogPartitionSpec> listPartitions(
      ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
    throw new UnsupportedOperationException();
  }

  @Override
  public List<CatalogPartitionSpec> listPartitionsByFilter(
      ObjectPath tablePath, List<Expression> filters) throws CatalogException {
    throw new UnsupportedOperationException();
  }

  // After partition pruning and filter push down, the statistics have become very inaccurate, so
  // the statistics from
  // here are of little significance.
  // Flink will support something like SupportsReportStatistics in future.

  @Override
  public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws CatalogException {
    return CatalogTableStatistics.UNKNOWN;
  }

  @Override
  public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath)
      throws CatalogException {
    return CatalogColumnStatistics.UNKNOWN;
  }

  @Override
  public CatalogTableStatistics getPartitionStatistics(
      ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
    return CatalogTableStatistics.UNKNOWN;
  }

  @Override
  public CatalogColumnStatistics getPartitionColumnStatistics(
      ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
    return CatalogColumnStatistics.UNKNOWN;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



