public void syncTables()

in polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/PolarisSynchronizer.java [1122:1289]


  public void syncTables(
      String catalogName,
      Namespace namespace,
      IcebergCatalogService sourceIcebergCatalogService,
      IcebergCatalogService targetIcebergCatalogService) {
    Set<TableIdentifier> sourceTables;

    try {
      sourceTables = new HashSet<>(sourceIcebergCatalogService.listTables(namespace));
      clientLogger.info(
          "Listed {} tables in namespace {} for catalog {} on source.",
          sourceTables.size(),
          namespace,
          catalogName);
    } catch (Exception e) {
      if (haltOnFailure) throw e;
      clientLogger.error(
          "Failed to list tables in namespace {} for catalog {} on source.",
          namespace,
          catalogName,
          e);
      return;
    }

    Set<TableIdentifier> targetTables;

    try {
      targetTables = new HashSet<>(targetIcebergCatalogService.listTables(namespace));
      clientLogger.info(
          "Listed {} tables in namespace {} for catalog {} on target.",
          targetTables.size(),
          namespace,
          catalogName);
    } catch (Exception e) {
      if (haltOnFailure) throw e;
      clientLogger.error(
          "Failed to list tables in namespace {} for catalog {} on target.",
          namespace,
          catalogName,
          e);
      return;
    }

    SynchronizationPlan<TableIdentifier> tableSyncPlan =
        syncPlanner.planTableSync(catalogName, namespace, sourceTables, targetTables);

    tableSyncPlan
        .entitiesToSkip()
        .forEach(
            tableId ->
                clientLogger.info(
                    "Skipping table {} in namespace {} in catalog {}.",
                    tableId,
                    namespace,
                    catalogName));

    int syncsCompleted = 0;
    int totalSyncsToComplete = totalSyncsToComplete(tableSyncPlan);

    for (TableIdentifier tableId : tableSyncPlan.entitiesToCreate()) {
      try {
        Table table = sourceIcebergCatalogService.loadTable(tableId);

        if (table instanceof BaseTable baseTable) {
          targetIcebergCatalogService.registerTable(
              tableId, baseTable.operations().current().metadataFileLocation());
        } else {
          throw new IllegalStateException("Cannot register table that does not extend BaseTable.");
        }

        if (table instanceof BaseTableWithETag tableWithETag) {
          etagManager.storeETag(catalogName, tableId, tableWithETag.etag());
        }

        clientLogger.info(
            "Registered table {} in namespace {} in catalog {}. - {}/{}",
            tableId,
            namespace,
            catalogName,
            ++syncsCompleted,
            totalSyncsToComplete);
      } catch (Exception e) {
        if (haltOnFailure) throw e;
        clientLogger.error(
            "Failed to register table {} in namespace {} in catalog {}. - {}/{}",
            tableId,
            namespace,
            catalogName,
            ++syncsCompleted,
            totalSyncsToComplete,
            e);
      }
    }

    for (TableIdentifier tableId : tableSyncPlan.entitiesToOverwrite()) {
      try {
        Table table;

        if (this.diffOnly && sourceIcebergCatalogService instanceof PolarisIcebergCatalogService polarisCatalogService) {
          String etag = etagManager.getETag(catalogName, tableId);
          table = polarisCatalogService.loadTable(tableId, etag);
        } else {
          table = sourceIcebergCatalogService.loadTable(tableId);
        }

        if (table instanceof BaseTable baseTable) {
          targetIcebergCatalogService.dropTableWithoutPurge(tableId);
          targetIcebergCatalogService.registerTable(
              tableId, baseTable.operations().current().metadataFileLocation());
        } else {
          throw new IllegalStateException("Cannot register table that does not extend BaseTable.");
        }

        if (table instanceof BaseTableWithETag tableWithETag) {
          etagManager.storeETag(catalogName, tableId, tableWithETag.etag());
        }

        clientLogger.info(
            "Dropped and re-registered table {} in namespace {} in catalog {}. - {}/{}",
            tableId,
            namespace,
            catalogName,
            ++syncsCompleted,
            totalSyncsToComplete);
      } catch (MetadataNotModifiedException e) {
        clientLogger.info(
            "Table {} in namespace {} in catalog {} with was not modified, not overwriting in target catalog. - {}/{}",
            tableId,
            namespace,
            catalogName,
            ++syncsCompleted,
            totalSyncsToComplete);
      } catch (Exception e) {
        if (haltOnFailure) throw e;
        clientLogger.error(
            "Failed to drop and re-register table {} in namespace {} in catalog {}. - {}/{}",
            tableId,
            namespace,
            catalogName,
            ++syncsCompleted,
            totalSyncsToComplete,
            e);
      }
    }

    for (TableIdentifier table : tableSyncPlan.entitiesToRemove()) {
      try {
        targetIcebergCatalogService.dropTableWithoutPurge(table);
        clientLogger.info(
            "Dropped table {} in namespace {} in catalog {}. - {}/{}",
            table,
            namespace,
            catalogName,
            ++syncsCompleted,
            totalSyncsToComplete);
      } catch (Exception e) {
        if (haltOnFailure) throw e;
        clientLogger.info(
            "Failed to drop table {} in namespace {} in catalog {}. - {}/{}",
            table,
            namespace,
            catalogName,
            ++syncsCompleted,
            totalSyncsToComplete,
            e);
      }
    }
  }