in xtable-hudi-support/xtable-hudi-support-extensions/src/main/java/org/apache/xtable/hudi/sync/XTableSyncTool.java [66:117]
public void syncHoodieTable() {
List<String> formatsToSync =
Arrays.stream(config.getString(XTableSyncConfig.XTABLE_FORMATS).split(","))
.map(format -> format.toUpperCase())
.collect(Collectors.toList());
String basePath = config.getString(HoodieSyncConfig.META_SYNC_BASE_PATH);
String tableName = config.getString(HoodieTableConfig.HOODIE_TABLE_NAME_KEY);
Properties sourceProperties = new Properties();
sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, getPartitionSpecConfig());
SourceTable sourceTable =
SourceTable.builder()
.name(tableName)
.formatName(HUDI)
.basePath(basePath)
.additionalProperties(sourceProperties)
.build();
Duration metadataRetention =
config.contains(XTableSyncConfig.XTABLE_TARGET_METADATA_RETENTION_HOURS)
? Duration.ofHours(
config.getInt(XTableSyncConfig.XTABLE_TARGET_METADATA_RETENTION_HOURS))
: null;
List<TargetTable> targetTables =
formatsToSync.stream()
.map(
format ->
TargetTable.builder()
.basePath(basePath)
.metadataRetention(metadataRetention)
.formatName(format)
.name(tableName)
.build())
.collect(Collectors.toList());
ConversionConfig conversionConfig =
ConversionConfig.builder()
.sourceTable(sourceTable)
.targetTables(targetTables)
.syncMode(SyncMode.INCREMENTAL)
.build();
Map<String, SyncResult> results =
new ConversionController(hadoopConf).sync(conversionConfig, hudiConversionSourceProvider);
String failingFormats =
results.entrySet().stream()
.filter(
entry ->
entry.getValue().getTableFormatSyncStatus().getStatusCode()
!= SyncStatusCode.SUCCESS)
.map(Map.Entry::getKey)
.collect(Collectors.joining(","));
if (!failingFormats.isEmpty()) {
throw new HoodieException("Unable to sync to InternalTable for formats: " + failingFormats);
}
}