private Map syncTableFormats()

in xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java [174:220]


  private <COMMIT> Map<String, SyncResult> syncTableFormats(
      ConversionConfig config, ExtractFromSource<COMMIT> source, SyncMode syncMode) {
    Map<String, ConversionTarget> conversionTargetByFormat =
        config.getTargetTables().stream()
            .filter(
                targetTable ->
                    !targetTable.getFormatName().equals(config.getSourceTable().getFormatName()))
            .collect(
                Collectors.toMap(
                    TargetTable::getFormatName,
                    targetTable -> conversionTargetFactory.createForFormat(targetTable, conf)));

    Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat =
        conversionTargetByFormat.entrySet().stream()
            .collect(
                Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getTableMetadata()));
    Map<String, ConversionTarget> formatsToSyncIncrementally =
        getFormatsToSyncIncrementally(
            syncMode,
            conversionTargetByFormat,
            lastSyncMetadataByFormat,
            source.getConversionSource());
    Map<String, ConversionTarget> formatsToSyncBySnapshot =
        conversionTargetByFormat.entrySet().stream()
            .filter(entry -> !formatsToSyncIncrementally.containsKey(entry.getKey()))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    SyncResultForTableFormats syncResultForSnapshotSync =
        formatsToSyncBySnapshot.isEmpty()
            ? SyncResultForTableFormats.builder().build()
            : syncSnapshot(formatsToSyncBySnapshot, source);
    SyncResultForTableFormats syncResultForIncrementalSync =
        formatsToSyncIncrementally.isEmpty()
            ? SyncResultForTableFormats.builder().build()
            : syncIncrementalChanges(formatsToSyncIncrementally, lastSyncMetadataByFormat, source);
    Map<String, SyncResult> syncResultsMerged =
        new HashMap<>(syncResultForIncrementalSync.getLastSyncResult());
    syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult());
    String successfulSyncs = getFormatsWithStatusCode(syncResultsMerged, SyncStatusCode.SUCCESS);
    if (!successfulSyncs.isEmpty()) {
      log.info("Sync is successful for the following formats {}", successfulSyncs);
    }
    String failedSyncs = getFormatsWithStatusCode(syncResultsMerged, SyncStatusCode.ERROR);
    if (!failedSyncs.isEmpty()) {
      log.error("Sync failed for the following formats {}", failedSyncs);
    }
    return syncResultsMerged;
  }