in polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/PolarisSynchronizer.java [1122:1289]
public void syncTables(
String catalogName,
Namespace namespace,
IcebergCatalogService sourceIcebergCatalogService,
IcebergCatalogService targetIcebergCatalogService) {
Set<TableIdentifier> sourceTables;
try {
sourceTables = new HashSet<>(sourceIcebergCatalogService.listTables(namespace));
clientLogger.info(
"Listed {} tables in namespace {} for catalog {} on source.",
sourceTables.size(),
namespace,
catalogName);
} catch (Exception e) {
if (haltOnFailure) throw e;
clientLogger.error(
"Failed to list tables in namespace {} for catalog {} on source.",
namespace,
catalogName,
e);
return;
}
Set<TableIdentifier> targetTables;
try {
targetTables = new HashSet<>(targetIcebergCatalogService.listTables(namespace));
clientLogger.info(
"Listed {} tables in namespace {} for catalog {} on target.",
targetTables.size(),
namespace,
catalogName);
} catch (Exception e) {
if (haltOnFailure) throw e;
clientLogger.error(
"Failed to list tables in namespace {} for catalog {} on target.",
namespace,
catalogName,
e);
return;
}
SynchronizationPlan<TableIdentifier> tableSyncPlan =
syncPlanner.planTableSync(catalogName, namespace, sourceTables, targetTables);
tableSyncPlan
.entitiesToSkip()
.forEach(
tableId ->
clientLogger.info(
"Skipping table {} in namespace {} in catalog {}.",
tableId,
namespace,
catalogName));
int syncsCompleted = 0;
int totalSyncsToComplete = totalSyncsToComplete(tableSyncPlan);
for (TableIdentifier tableId : tableSyncPlan.entitiesToCreate()) {
try {
Table table = sourceIcebergCatalogService.loadTable(tableId);
if (table instanceof BaseTable baseTable) {
targetIcebergCatalogService.registerTable(
tableId, baseTable.operations().current().metadataFileLocation());
} else {
throw new IllegalStateException("Cannot register table that does not extend BaseTable.");
}
if (table instanceof BaseTableWithETag tableWithETag) {
etagManager.storeETag(catalogName, tableId, tableWithETag.etag());
}
clientLogger.info(
"Registered table {} in namespace {} in catalog {}. - {}/{}",
tableId,
namespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete);
} catch (Exception e) {
if (haltOnFailure) throw e;
clientLogger.error(
"Failed to register table {} in namespace {} in catalog {}. - {}/{}",
tableId,
namespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete,
e);
}
}
for (TableIdentifier tableId : tableSyncPlan.entitiesToOverwrite()) {
try {
Table table;
if (this.diffOnly && sourceIcebergCatalogService instanceof PolarisIcebergCatalogService polarisCatalogService) {
String etag = etagManager.getETag(catalogName, tableId);
table = polarisCatalogService.loadTable(tableId, etag);
} else {
table = sourceIcebergCatalogService.loadTable(tableId);
}
if (table instanceof BaseTable baseTable) {
targetIcebergCatalogService.dropTableWithoutPurge(tableId);
targetIcebergCatalogService.registerTable(
tableId, baseTable.operations().current().metadataFileLocation());
} else {
throw new IllegalStateException("Cannot register table that does not extend BaseTable.");
}
if (table instanceof BaseTableWithETag tableWithETag) {
etagManager.storeETag(catalogName, tableId, tableWithETag.etag());
}
clientLogger.info(
"Dropped and re-registered table {} in namespace {} in catalog {}. - {}/{}",
tableId,
namespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete);
} catch (MetadataNotModifiedException e) {
clientLogger.info(
"Table {} in namespace {} in catalog {} with was not modified, not overwriting in target catalog. - {}/{}",
tableId,
namespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete);
} catch (Exception e) {
if (haltOnFailure) throw e;
clientLogger.error(
"Failed to drop and re-register table {} in namespace {} in catalog {}. - {}/{}",
tableId,
namespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete,
e);
}
}
for (TableIdentifier table : tableSyncPlan.entitiesToRemove()) {
try {
targetIcebergCatalogService.dropTableWithoutPurge(table);
clientLogger.info(
"Dropped table {} in namespace {} in catalog {}. - {}/{}",
table,
namespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete);
} catch (Exception e) {
if (haltOnFailure) throw e;
clientLogger.info(
"Failed to drop table {} in namespace {} in catalog {}. - {}/{}",
table,
namespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete,
e);
}
}
}