public static void main()

in xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java [113:200]


  public static void main(String[] args) throws Exception {
    CommandLineParser parser = new DefaultParser();
    CommandLine cmd;
    try {
      cmd = parser.parse(OPTIONS, args);
    } catch (ParseException e) {
      new HelpFormatter().printHelp("xtable.jar", OPTIONS, true);
      return;
    }

    if (cmd.hasOption(HELP_OPTION)) {
      HelpFormatter formatter = new HelpFormatter();
      formatter.printHelp("RunCatalogSync", OPTIONS);
      return;
    }

    DatasetConfig datasetConfig;
    try (InputStream inputStream =
        Files.newInputStream(
            Paths.get(cmd.getOptionValue(CATALOG_SOURCE_AND_TARGET_CONFIG_PATH)))) {
      datasetConfig = YAML_MAPPER.readValue(inputStream, DatasetConfig.class);
    }
    String hadoopConfigpath = getValueFromConfig(cmd, HADOOP_CONFIG_PATH);
    byte[] customConfig = getCustomConfigurations(hadoopConfigpath);
    Configuration hadoopConf = loadHadoopConf(customConfig);
    String conversionProviderConfigpath = getValueFromConfig(cmd, CONVERTERS_CONFIG_PATH);
    customConfig = getCustomConfigurations(conversionProviderConfigpath);
    TableFormatConverters tableFormatConverters = loadTableFormatConversionConfigs(customConfig);

    Map<String, ExternalCatalogConfig> catalogsById =
        datasetConfig.getTargetCatalogs().stream()
            .collect(Collectors.toMap(ExternalCatalogConfig::getCatalogId, Function.identity()));
    Optional<CatalogConversionSource> catalogConversionSource =
        getCatalogConversionSource(datasetConfig.getSourceCatalog(), hadoopConf);
    ConversionController conversionController = new ConversionController(hadoopConf);
    for (DatasetConfig.Dataset dataset : datasetConfig.getDatasets()) {
      SourceTable sourceTable =
          getSourceTable(dataset.getSourceCatalogTableIdentifier(), catalogConversionSource);
      List<TargetTable> targetTables = new ArrayList<>();
      Map<TargetTable, List<TargetCatalogConfig>> targetCatalogs = new HashMap<>();
      for (TargetTableIdentifier targetCatalogTableIdentifier :
          dataset.getTargetCatalogTableIdentifiers()) {
        TargetTable targetTable =
            TargetTable.builder()
                .name(sourceTable.getName())
                .basePath(
                    getSourceTableLocation(
                        targetCatalogTableIdentifier.getTableFormat(), sourceTable))
                .namespace(sourceTable.getNamespace())
                .formatName(targetCatalogTableIdentifier.getTableFormat())
                .additionalProperties(sourceTable.getAdditionalProperties())
                .build();
        targetTables.add(targetTable);
        if (!targetCatalogs.containsKey(targetTable)) {
          targetCatalogs.put(targetTable, new ArrayList<>());
        }
        targetCatalogs
            .get(targetTable)
            .add(
                TargetCatalogConfig.builder()
                    .catalogTableIdentifier(
                        getCatalogTableIdentifier(
                            targetCatalogTableIdentifier.getTableIdentifier()))
                    .catalogConfig(catalogsById.get(targetCatalogTableIdentifier.getCatalogId()))
                    .build());
      }
      ConversionConfig conversionConfig =
          ConversionConfig.builder()
              .sourceTable(sourceTable)
              .targetTables(targetTables)
              .targetCatalogs(targetCatalogs)
              .syncMode(SyncMode.INCREMENTAL)
              .build();
      List<String> tableFormats =
          Stream.concat(
                  Stream.of(sourceTable.getFormatName()),
                  targetTables.stream().map(TargetTable::getFormatName))
              .distinct()
              .collect(Collectors.toList());
      try {
        conversionController.syncTableAcrossCatalogs(
            conversionConfig,
            getConversionSourceProviders(tableFormats, tableFormatConverters, hadoopConf));
      } catch (Exception e) {
        log.error("Error running sync for {}", sourceTable.getBasePath(), e);
      }
    }
  }