public static void syncHiveDataToMixedTable()

in amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/utils/HiveMetaSynchronizer.java [185:272]


  public static void syncHiveDataToMixedTable(
      SupportHive table, HMSClientPool hiveClient, boolean force) {
    if (!HiveTableUtil.checkExist(hiveClient, table.id())) {
      LOG.warn("Hive table {} does not exist, try to skip sync data to amoro", table.id());
      return;
    }
    UnkeyedTable baseStore;
    if (table.isKeyedTable()) {
      baseStore = table.asKeyedTable().baseTable();
    } else {
      baseStore = table.asUnkeyedTable();
    }
    try {
      if (table.spec().isUnpartitioned()) {
        Table hiveTable =
            hiveClient.run(
                client -> client.getTable(table.id().getDatabase(), table.id().getTableName()));
        if (force || tableHasModified(baseStore, hiveTable)) {
          List<DataFile> hiveDataFiles =
              listHivePartitionFiles(table, Maps.newHashMap(), hiveTable.getSd().getLocation());
          List<DataFile> deleteFiles = Lists.newArrayList();
          try (CloseableIterable<FileScanTask> fileScanTasks = baseStore.newScan().planFiles()) {
            fileScanTasks.forEach(fileScanTask -> deleteFiles.add(fileScanTask.file()));
          } catch (IOException e) {
            throw new UncheckedIOException("Failed to close table scan of " + table.name(), e);
          }
          overwriteTable(table, deleteFiles, hiveDataFiles);
        }
      } else {
        // list all hive partitions.
        List<Partition> hivePartitions =
            hiveClient.run(
                client ->
                    client.listPartitions(
                        table.id().getDatabase(), table.id().getTableName(), Short.MAX_VALUE));
        // group mixed-hive table files by partition.
        StructLikeMap<Collection<DataFile>> filesGroupedByPartition =
            StructLikeMap.create(table.spec().partitionType());
        TableScan tableScan = baseStore.newScan();
        try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
          for (org.apache.iceberg.FileScanTask fileScanTask : fileScanTasks) {
            filesGroupedByPartition
                .computeIfAbsent(fileScanTask.file().partition(), k -> Lists.newArrayList())
                .add(fileScanTask.file());
          }
        } catch (IOException e) {
          throw new UncheckedIOException("Failed to close table scan of " + table.name(), e);
        }
        List<DataFile> filesToDelete = Lists.newArrayList();
        List<DataFile> filesToAdd = Lists.newArrayList();
        List<StructLike> icebergPartitions = Lists.newArrayList(filesGroupedByPartition.keySet());
        for (Partition hivePartition : hivePartitions) {
          StructLike partitionData =
              HivePartitionUtil.buildPartitionData(hivePartition.getValues(), table.spec());
          icebergPartitions.remove(partitionData);
          if (force || partitionHasModified(baseStore, hivePartition, partitionData)) {
            List<DataFile> hiveDataFiles =
                listHivePartitionFiles(
                    table,
                    buildPartitionValueMap(hivePartition.getValues(), table.spec()),
                    hivePartition.getSd().getLocation());
            if (filesGroupedByPartition.get(partitionData) != null) {
              filesToDelete.addAll(filesGroupedByPartition.get(partitionData));
              filesToAdd.addAll(hiveDataFiles);
              // make sure new partition is not created by mixed-hive table
            } else if (!CompatibleHivePropertyUtil.propertyAsBoolean(
                hivePartition.getParameters(), HiveTableProperties.MIXED_TABLE_FLAG, false)) {
              filesToAdd.addAll(hiveDataFiles);
            }
          }
        }

        icebergPartitions.forEach(
            partition -> {
              List<DataFile> dataFiles = Lists.newArrayList(filesGroupedByPartition.get(partition));
              if (dataFiles.size() > 0) {
                // make sure dropped partition with no files
                if (!table.io().exists(dataFiles.get(0).path().toString())) {
                  filesToDelete.addAll(filesGroupedByPartition.get(partition));
                }
              }
            });
        overwriteTable(table, filesToDelete, filesToAdd);
      }
    } catch (TException | InterruptedException e) {
      throw new RuntimeException("Failed to get hive table:" + table.id(), e);
    }
  }