in amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/utils/HiveMetaSynchronizer.java [185:272]
public static void syncHiveDataToMixedTable(
SupportHive table, HMSClientPool hiveClient, boolean force) {
if (!HiveTableUtil.checkExist(hiveClient, table.id())) {
LOG.warn("Hive table {} does not exist, try to skip sync data to amoro", table.id());
return;
}
UnkeyedTable baseStore;
if (table.isKeyedTable()) {
baseStore = table.asKeyedTable().baseTable();
} else {
baseStore = table.asUnkeyedTable();
}
try {
if (table.spec().isUnpartitioned()) {
Table hiveTable =
hiveClient.run(
client -> client.getTable(table.id().getDatabase(), table.id().getTableName()));
if (force || tableHasModified(baseStore, hiveTable)) {
List<DataFile> hiveDataFiles =
listHivePartitionFiles(table, Maps.newHashMap(), hiveTable.getSd().getLocation());
List<DataFile> deleteFiles = Lists.newArrayList();
try (CloseableIterable<FileScanTask> fileScanTasks = baseStore.newScan().planFiles()) {
fileScanTasks.forEach(fileScanTask -> deleteFiles.add(fileScanTask.file()));
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table scan of " + table.name(), e);
}
overwriteTable(table, deleteFiles, hiveDataFiles);
}
} else {
// list all hive partitions.
List<Partition> hivePartitions =
hiveClient.run(
client ->
client.listPartitions(
table.id().getDatabase(), table.id().getTableName(), Short.MAX_VALUE));
// group mixed-hive table files by partition.
StructLikeMap<Collection<DataFile>> filesGroupedByPartition =
StructLikeMap.create(table.spec().partitionType());
TableScan tableScan = baseStore.newScan();
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
for (org.apache.iceberg.FileScanTask fileScanTask : fileScanTasks) {
filesGroupedByPartition
.computeIfAbsent(fileScanTask.file().partition(), k -> Lists.newArrayList())
.add(fileScanTask.file());
}
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table scan of " + table.name(), e);
}
List<DataFile> filesToDelete = Lists.newArrayList();
List<DataFile> filesToAdd = Lists.newArrayList();
List<StructLike> icebergPartitions = Lists.newArrayList(filesGroupedByPartition.keySet());
for (Partition hivePartition : hivePartitions) {
StructLike partitionData =
HivePartitionUtil.buildPartitionData(hivePartition.getValues(), table.spec());
icebergPartitions.remove(partitionData);
if (force || partitionHasModified(baseStore, hivePartition, partitionData)) {
List<DataFile> hiveDataFiles =
listHivePartitionFiles(
table,
buildPartitionValueMap(hivePartition.getValues(), table.spec()),
hivePartition.getSd().getLocation());
if (filesGroupedByPartition.get(partitionData) != null) {
filesToDelete.addAll(filesGroupedByPartition.get(partitionData));
filesToAdd.addAll(hiveDataFiles);
// make sure new partition is not created by mixed-hive table
} else if (!CompatibleHivePropertyUtil.propertyAsBoolean(
hivePartition.getParameters(), HiveTableProperties.MIXED_TABLE_FLAG, false)) {
filesToAdd.addAll(hiveDataFiles);
}
}
}
icebergPartitions.forEach(
partition -> {
List<DataFile> dataFiles = Lists.newArrayList(filesGroupedByPartition.get(partition));
if (dataFiles.size() > 0) {
// make sure dropped partition with no files
if (!table.io().exists(dataFiles.get(0).path().toString())) {
filesToDelete.addAll(filesGroupedByPartition.get(partition));
}
}
});
overwriteTable(table, filesToDelete, filesToAdd);
}
} catch (TException | InterruptedException e) {
throw new RuntimeException("Failed to get hive table:" + table.id(), e);
}
}