protected List apply()

in amoro-format-iceberg/src/main/java/org/apache/amoro/op/OverwriteBaseFiles.java [162:308]


  protected List<StatisticsFile> apply(Transaction transaction) {
    Preconditions.checkState(
        this.dynamic != null,
        "updateOptimizedSequence() or updateOptimizedSequenceDynamically() must be invoked");
    applyDeleteExpression();

    StructLikeMap<Long> sequenceForChangedPartitions = null;
    if (this.dynamic) {
      sequenceForChangedPartitions =
          StructLikeMap.create(transaction.table().spec().partitionType());
    }

    UnkeyedTable baseTable = keyedTable.baseTable();
    List<CreateSnapshotEvent> newSnapshots = Lists.newArrayList();

    // step1: overwrite data files
    if (!this.addFiles.isEmpty() || !this.deleteFiles.isEmpty()) {
      OverwriteFiles overwriteFiles = transaction.newOverwrite();
      overwriteFiles.set(MixedTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST, "true");
      overwriteFiles.set(MixedTableUtil.BLOB_TYPE_BASE_OPTIMIZED_TIME_EXIST, "true");

      if (conflictDetectionFilter != null && baseTable.currentSnapshot() != null) {
        overwriteFiles.conflictDetectionFilter(conflictDetectionFilter).validateNoConflictingData();
        overwriteFiles.validateFromSnapshot(baseTable.currentSnapshot().snapshotId());
      }
      if (this.dynamic) {
        for (DataFile d : this.addFiles) {
          sequenceForChangedPartitions.put(d.partition(), this.optimizedSequence);
        }
        for (DataFile d : this.deleteFiles) {
          sequenceForChangedPartitions.put(d.partition(), this.optimizedSequence);
        }
      }
      this.addFiles.forEach(overwriteFiles::addFile);
      this.deleteFiles.forEach(overwriteFiles::deleteFile);
      if (optimizedSequence != null && optimizedSequence > 0) {
        overwriteFiles.set(PROPERTIES_TRANSACTION_ID, optimizedSequence + "");
      }

      if (MapUtils.isNotEmpty(properties)) {
        properties.forEach(overwriteFiles::set);
      }
      overwriteFiles.commit();
      newSnapshots.add((CreateSnapshotEvent) overwriteFiles.updateEvent());
    }

    // step2: RowDelta/Rewrite pos-delete files
    if (CollectionUtils.isNotEmpty(addDeleteFiles)
        || CollectionUtils.isNotEmpty(deleteDeleteFiles)) {
      if (CollectionUtils.isEmpty(deleteDeleteFiles)) {
        RowDelta rowDelta = transaction.newRowDelta();
        rowDelta.set(MixedTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST, "true");
        rowDelta.set(MixedTableUtil.BLOB_TYPE_BASE_OPTIMIZED_TIME_EXIST, "true");
        if (baseTable.currentSnapshot() != null) {
          rowDelta.validateFromSnapshot(baseTable.currentSnapshot().snapshotId());
        }

        if (this.dynamic) {
          for (DeleteFile d : this.addDeleteFiles) {
            sequenceForChangedPartitions.put(d.partition(), this.optimizedSequence);
          }
        }

        addDeleteFiles.forEach(rowDelta::addDeletes);
        if (MapUtils.isNotEmpty(properties)) {
          properties.forEach(rowDelta::set);
        }
        rowDelta.commit();
        newSnapshots.add((CreateSnapshotEvent) rowDelta.updateEvent());
      } else {
        RewriteFiles rewriteFiles = transaction.newRewrite();
        rewriteFiles.set(MixedTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE_EXIST, "true");
        rewriteFiles.set(MixedTableUtil.BLOB_TYPE_BASE_OPTIMIZED_TIME_EXIST, "true");
        if (baseTable.currentSnapshot() != null) {
          rewriteFiles.validateFromSnapshot(baseTable.currentSnapshot().snapshotId());
        }

        if (this.dynamic) {
          for (DeleteFile d : this.addDeleteFiles) {
            sequenceForChangedPartitions.put(d.partition(), this.optimizedSequence);
          }
          for (DeleteFile d : this.deleteDeleteFiles) {
            sequenceForChangedPartitions.put(d.partition(), this.optimizedSequence);
          }
        }
        rewriteFiles.rewriteFiles(
            Collections.emptySet(),
            new HashSet<>(deleteDeleteFiles),
            Collections.emptySet(),
            new HashSet<>(addDeleteFiles));
        if (MapUtils.isNotEmpty(properties)) {
          properties.forEach(rewriteFiles::set);
        }
        rewriteFiles.commit();
        newSnapshots.add((CreateSnapshotEvent) rewriteFiles.updateEvent());
      }
    }
    if (newSnapshots.isEmpty()) {
      return Collections.emptyList();
    }

    // step3: set optimized sequence id, optimized time
    long commitTime = System.currentTimeMillis();
    PartitionSpec spec = transaction.table().spec();
    StructLikeMap<Long> oldOptimizedSequence = MixedTableUtil.readOptimizedSequence(keyedTable);
    StructLikeMap<Long> oldOptimizedTime = MixedTableUtil.readBaseOptimizedTime(keyedTable);
    StructLikeMap<Long> optimizedSequence = StructLikeMap.create(spec.partitionType());
    StructLikeMap<Long> optimizedTime = StructLikeMap.create(spec.partitionType());
    if (oldOptimizedSequence != null) {
      optimizedSequence.putAll(oldOptimizedSequence);
    }
    if (oldOptimizedTime != null) {
      optimizedTime.putAll(oldOptimizedTime);
    }
    StructLikeMap<Long> toChangePartitionSequence;
    if (this.dynamic) {
      toChangePartitionSequence = sequenceForChangedPartitions;
    } else {
      toChangePartitionSequence = this.partitionOptimizedSequence;
    }
    toChangePartitionSequence.forEach(
        (partition, sequence) -> {
          optimizedSequence.put(partition, sequence);
          optimizedTime.put(partition, commitTime);
        });

    StatisticsFile statisticsFile = null;
    List<StatisticsFile> result = Lists.newArrayList();
    for (CreateSnapshotEvent newSnapshot : newSnapshots) {
      if (statisticsFile != null) {
        result.add(StatisticsFileUtil.copyToSnapshot(statisticsFile, newSnapshot.snapshotId()));
      } else {
        Table table = transaction.table();
        StatisticsFileUtil.PartitionDataSerializer<Long> dataSerializer =
            StatisticsFileUtil.createPartitionDataSerializer(table.spec(), Long.class);
        statisticsFile =
            StatisticsFileUtil.writerBuilder(table)
                .withSnapshotId(newSnapshot.snapshotId())
                .build()
                .add(MixedTableUtil.BLOB_TYPE_OPTIMIZED_SEQUENCE, optimizedSequence, dataSerializer)
                .add(MixedTableUtil.BLOB_TYPE_BASE_OPTIMIZED_TIME, optimizedTime, dataSerializer)
                .complete();
        result.add(statisticsFile);
      }
    }
    return result;
  }