in polaris-synchronizer/api/src/main/java/org/apache/polaris/tools/sync/polaris/PolarisSynchronizer.java [954:1112]
public void syncNamespaces(
String catalogName,
Namespace parentNamespace,
IcebergCatalogService sourceIcebergCatalogService,
IcebergCatalogService targetIcebergCatalogService) {
List<Namespace> namespacesSource;
try {
namespacesSource = sourceIcebergCatalogService.listNamespaces(parentNamespace);
clientLogger.info(
"Listed {} namespaces in namespace {} for catalog {} from source.",
namespacesSource.size(),
parentNamespace,
catalogName);
} catch (Exception e) {
if (haltOnFailure) throw e;
clientLogger.error(
"Failed to list namespaces in namespace {} for catalog {} from source.",
parentNamespace,
catalogName,
e);
return;
}
List<Namespace> namespacesTarget;
try {
namespacesTarget = targetIcebergCatalogService.listNamespaces(parentNamespace);
clientLogger.info(
"Listed {} namespaces in namespace {} for catalog {} from target.",
namespacesTarget.size(),
parentNamespace,
catalogName);
} catch (Exception e) {
if (haltOnFailure) throw e;
clientLogger.error(
"Failed to list namespaces in namespace {} for catalog {} from target.",
parentNamespace,
catalogName,
e);
return;
}
SynchronizationPlan<Namespace> namespaceSynchronizationPlan =
syncPlanner.planNamespaceSync(
catalogName, parentNamespace, namespacesSource, namespacesTarget);
int syncsCompleted = 0;
int totalSyncsToComplete = totalSyncsToComplete(namespaceSynchronizationPlan);
namespaceSynchronizationPlan
.entitiesNotModified()
.forEach(
namespace ->
clientLogger.info(
"No change detected for namespace {} in namespace {} for catalog {}, skipping.",
namespace,
parentNamespace,
catalogName));
for (Namespace namespace : namespaceSynchronizationPlan.entitiesToCreate()) {
try {
Map<String, String> namespaceMetadata = sourceIcebergCatalogService.loadNamespaceMetadata(namespace);
targetIcebergCatalogService.createNamespace(namespace, namespaceMetadata);
clientLogger.info(
"Created namespace {} in namespace {} for catalog {} - {}/{}",
namespace,
parentNamespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete);
} catch (Exception e) {
if (haltOnFailure) throw e;
clientLogger.error(
"Failed to create namespace {} in namespace {} for catalog {} - {}/{}",
namespace,
parentNamespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete,
e);
}
}
for (Namespace namespace : namespaceSynchronizationPlan.entitiesToOverwrite()) {
try {
Map<String, String> sourceNamespaceMetadata =
sourceIcebergCatalogService.loadNamespaceMetadata(namespace);
if (this.diffOnly) {
// if only configured to migrate the diff between the source and the target Polaris,
// then we can load the target namespace metadata and perform a comparison to discontinue early
// if we notice the metadata did not change
Map<String, String> targetNamespaceMetadata =
targetIcebergCatalogService.loadNamespaceMetadata(namespace);
if (sourceNamespaceMetadata.equals(targetNamespaceMetadata)) {
clientLogger.info(
"Namespace metadata for namespace {} in namespace {} for catalog {} was not modified, skipping. - {}/{}",
namespace,
parentNamespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete);
continue;
}
}
targetIcebergCatalogService.setNamespaceProperties(namespace, sourceNamespaceMetadata);
clientLogger.info(
"Overwrote namespace metadata {} in namespace {} for catalog {} - {}/{}",
namespace,
parentNamespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete);
} catch (Exception e) {
if (haltOnFailure) throw e;
clientLogger.error(
"Failed to overwrite namespace metadata {} in namespace {} for catalog {} - {}/{}",
namespace,
parentNamespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete,
e);
}
}
for (Namespace namespace : namespaceSynchronizationPlan.entitiesToRemove()) {
try {
targetIcebergCatalogService.dropNamespaceCascade(namespace);
clientLogger.info(
"Removed namespace {} in namespace {} for catalog {} - {}/{}",
namespace,
parentNamespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete);
} catch (Exception e) {
if (haltOnFailure) throw e;
clientLogger.error(
"Failed to remove namespace {} in namespace {} for catalog {} - {}/{}",
namespace,
parentNamespace,
catalogName,
++syncsCompleted,
totalSyncsToComplete,
e);
}
}
for (Namespace namespace : namespaceSynchronizationPlan.entitiesToSyncChildren()) {
syncTables(catalogName, namespace, sourceIcebergCatalogService, targetIcebergCatalogService);
syncNamespaces(catalogName, namespace, sourceIcebergCatalogService, targetIcebergCatalogService);
}
}