in xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java [113:200]
public static void main(String[] args) throws Exception {
CommandLineParser parser = new DefaultParser();
CommandLine cmd;
try {
cmd = parser.parse(OPTIONS, args);
} catch (ParseException e) {
new HelpFormatter().printHelp("xtable.jar", OPTIONS, true);
return;
}
if (cmd.hasOption(HELP_OPTION)) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("RunCatalogSync", OPTIONS);
return;
}
DatasetConfig datasetConfig;
try (InputStream inputStream =
Files.newInputStream(
Paths.get(cmd.getOptionValue(CATALOG_SOURCE_AND_TARGET_CONFIG_PATH)))) {
datasetConfig = YAML_MAPPER.readValue(inputStream, DatasetConfig.class);
}
String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH);
byte[] customConfig = getCustomConfigurations(hadoopConfigpath);
Configuration hadoopConf = loadHadoopConf(customConfig);
String conversionProviderConfigpath = getValueFromConfig(cmd, CONVERTERS_CONFIG_PATH);
customConfig = getCustomConfigurations(conversionProviderConfigpath);
TableFormatConverters tableFormatConverters = loadTableFormatConversionConfigs(customConfig);
Map<String, ExternalCatalogConfig> catalogsById =
datasetConfig.getTargetCatalogs().stream()
.collect(Collectors.toMap(ExternalCatalogConfig::getCatalogId, Function.identity()));
Optional<CatalogConversionSource> catalogConversionSource =
getCatalogConversionSource(datasetConfig.getSourceCatalog(), hadoopConf);
ConversionController conversionController = new ConversionController(hadoopConf);
for (DatasetConfig.Dataset dataset : datasetConfig.getDatasets()) {
SourceTable sourceTable =
getSourceTable(dataset.getSourceCatalogTableIdentifier(), catalogConversionSource);
List<TargetTable> targetTables = new ArrayList<>();
Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs = new HashMap<>();
for (TargetTableIdentifier targetCatalogTableIdentifier :
dataset.getTargetCatalogTableIdentifiers()) {
TargetTable targetTable =
TargetTable.builder()
.name(sourceTable.getName())
.basePath(
getSourceTableLocation(
targetCatalogTableIdentifier.getTableFormat(), sourceTable))
.namespace(sourceTable.getNamespace())
.formatName(targetCatalogTableIdentifier.getTableFormat())
.additionalProperties(sourceTable.getAdditionalProperties())
.build();
targetTables.add(targetTable);
if (!targetCatalogs.containsKey(targetTable)) {
targetCatalogs.put(targetTable, new ArrayList<>());
}
targetCatalogs
.get(targetTable)
.add(
TargetCatalogConfig.builder()
.catalogTableIdentifier(
getCatalogTableIdentifier(
targetCatalogTableIdentifier.getTableIdentifier()))
.catalogConfig(catalogsById.get(targetCatalogTableIdentifier.getCatalogId()))
.build());
}
ConversionConfig conversionConfig =
ConversionConfig.builder()
.sourceTable(sourceTable)
.targetTables(targetTables)
.targetCatalogs(targetCatalogs)
.syncMode(SyncMode.INCREMENTAL)
.build();
List<String> tableFormats =
Stream.concat(
Stream.of(sourceTable.getFormatName()),
targetTables.stream().map(TargetTable::getFormatName))
.distinct()
.collect(Collectors.toList());
try {
conversionController.syncTableAcrossCatalogs(
conversionConfig,
getConversionSourceProviders(tableFormats, tableFormatConverters, hadoopConf));
} catch (Exception e) {
log.error("Error running sync for {}", sourceTable.getBasePath(), e);
}
}
}