in xtable-core/src/main/java/org/apache/xtable/conversion/ConversionController.java [174:220]
private <COMMIT> Map<String, SyncResult> syncTableFormats(
ConversionConfig config, ExtractFromSource<COMMIT> source, SyncMode syncMode) {
Map<String, ConversionTarget> conversionTargetByFormat =
config.getTargetTables().stream()
.filter(
targetTable ->
!targetTable.getFormatName().equals(config.getSourceTable().getFormatName()))
.collect(
Collectors.toMap(
TargetTable::getFormatName,
targetTable -> conversionTargetFactory.createForFormat(targetTable, conf)));
Map<String, Optional<TableSyncMetadata>> lastSyncMetadataByFormat =
conversionTargetByFormat.entrySet().stream()
.collect(
Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getTableMetadata()));
Map<String, ConversionTarget> formatsToSyncIncrementally =
getFormatsToSyncIncrementally(
syncMode,
conversionTargetByFormat,
lastSyncMetadataByFormat,
source.getConversionSource());
Map<String, ConversionTarget> formatsToSyncBySnapshot =
conversionTargetByFormat.entrySet().stream()
.filter(entry -> !formatsToSyncIncrementally.containsKey(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
SyncResultForTableFormats syncResultForSnapshotSync =
formatsToSyncBySnapshot.isEmpty()
? SyncResultForTableFormats.builder().build()
: syncSnapshot(formatsToSyncBySnapshot, source);
SyncResultForTableFormats syncResultForIncrementalSync =
formatsToSyncIncrementally.isEmpty()
? SyncResultForTableFormats.builder().build()
: syncIncrementalChanges(formatsToSyncIncrementally, lastSyncMetadataByFormat, source);
Map<String, SyncResult> syncResultsMerged =
new HashMap<>(syncResultForIncrementalSync.getLastSyncResult());
syncResultsMerged.putAll(syncResultForSnapshotSync.getLastSyncResult());
String successfulSyncs = getFormatsWithStatusCode(syncResultsMerged, SyncStatusCode.SUCCESS);
if (!successfulSyncs.isEmpty()) {
log.info("Sync is successful for the following formats {}", successfulSyncs);
}
String failedSyncs = getFormatsWithStatusCode(syncResultsMerged, SyncStatusCode.ERROR);
if (!failedSyncs.isEmpty()) {
log.error("Sync failed for the following formats {}", failedSyncs);
}
return syncResultsMerged;
}