public CompletableFuture getNextBatch()

in amoro-format-mixed/amoro-mixed-trino/src/main/java/org/apache/amoro/trino/unkeyed/IcebergSplitSource.java [176:318]


  public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize) {
    long timeLeft =
        dynamicFilteringWaitTimeoutMillis - dynamicFilterWaitStopwatch.elapsed(MILLISECONDS);
    if (dynamicFilter.isAwaitable() && timeLeft > 0) {
      return dynamicFilter
          .isBlocked()
          .thenApply(ignored -> EMPTY_BATCH)
          .completeOnTimeout(EMPTY_BATCH, timeLeft, MILLISECONDS);
    }

    if (fileScanTaskIterable == null) {
      // Used to avoid duplicating work if the Dynamic Filter was already pushed down to the Iceberg
      // API
      boolean dynamicFilterIsComplete = dynamicFilter.isComplete();
      this.pushedDownDynamicFilterPredicate =
          dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast);
      TupleDomain<IcebergColumnHandle> fullPredicate =
          tableHandle.getUnenforcedPredicate().intersect(pushedDownDynamicFilterPredicate);
      // TODO: (https://github.com/trinodb/trino/issues/9743): Consider removing
      // TupleDomain#simplify
      TupleDomain<IcebergColumnHandle> simplifiedPredicate =
          fullPredicate.simplify(ICEBERG_DOMAIN_COMPACTION_THRESHOLD);
      boolean usedSimplifiedPredicate = !simplifiedPredicate.equals(fullPredicate);
      if (usedSimplifiedPredicate) {
        // Pushed down predicate was simplified, always evaluate it against individual splits
        this.pushedDownDynamicFilterPredicate = TupleDomain.all();
      }

      TupleDomain<IcebergColumnHandle> effectivePredicate =
          dataColumnPredicate.intersect(simplifiedPredicate);

      if (effectivePredicate.isNone()) {
        finish();
        return completedFuture(NO_MORE_SPLITS_BATCH);
      }

      Expression filterExpression = toIcebergExpression(effectivePredicate);
      // If the Dynamic Filter will be evaluated against each file, stats are required. Otherwise,
      // skip them.
      boolean requiresColumnStats = usedSimplifiedPredicate || !dynamicFilterIsComplete;
      TableScan scan = tableScan.filter(filterExpression);
      if (requiresColumnStats) {
        scan = scan.includeColumnStats();
      }
      if (tableScan instanceof ChangeTableIncrementalScan) {
        this.fileScanTaskIterable = scan.planFiles();
      } else {
        this.fileScanTaskIterable =
            TableScanUtil.splitFiles(scan.planFiles(), tableScan.targetSplitSize());
      }
      closer.register(fileScanTaskIterable);
      this.fileScanTaskIterator = fileScanTaskIterable.iterator();
      closer.register(fileScanTaskIterator);
      // TODO: Remove when NPE check has been released:
      // https://github.com/trinodb/trino/issues/15372
      isFinished();
    }

    TupleDomain<IcebergColumnHandle> dynamicFilterPredicate =
        dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast);
    if (dynamicFilterPredicate.isNone()) {
      finish();
      return completedFuture(NO_MORE_SPLITS_BATCH);
    }

    Iterator<FileScanTask> fileScanTasks = Iterators.limit(fileScanTaskIterator, maxSize);
    ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder();
    while (fileScanTasks.hasNext()) {
      FileScanTask scanTask = fileScanTasks.next();
      if (scanTask.deletes().isEmpty()
          && maxScannedFileSizeInBytes.isPresent()
          && scanTask.file().fileSizeInBytes() > maxScannedFileSizeInBytes.get()) {
        continue;
      }

      if (!pathDomain.includesNullableValue(utf8Slice(scanTask.file().path().toString()))) {
        continue;
      }
      if (!fileModifiedTimeDomain.isAll()) {
        long fileModifiedTime = getModificationTime(scanTask.file().path().toString());
        if (!fileModifiedTimeDomain.includesNullableValue(
            packDateTimeWithZone(fileModifiedTime, UTC_KEY))) {
          continue;
        }
      }
      IcebergSplit icebergSplit = toIcebergSplit(scanTask);

      Schema fileSchema = scanTask.spec().schema();
      Map<Integer, Optional<String>> partitionKeys = getPartitionKeys(scanTask);

      Set<IcebergColumnHandle> identityPartitionColumns =
          partitionKeys.keySet().stream()
              .map(fieldId -> getColumnHandle(fileSchema.findField(fieldId), typeManager))
              .collect(toImmutableSet());

      Supplier<Map<ColumnHandle, NullableValue>> partitionValues =
          memoize(
              () -> {
                Map<ColumnHandle, NullableValue> bindings = new HashMap<>();
                for (IcebergColumnHandle partitionColumn : identityPartitionColumns) {
                  Object partitionValue =
                      deserializePartitionValue(
                          partitionColumn.getType(),
                          partitionKeys.get(partitionColumn.getId()).orElse(null),
                          partitionColumn.getName());
                  NullableValue bindingValue =
                      new NullableValue(partitionColumn.getType(), partitionValue);
                  bindings.put(partitionColumn, bindingValue);
                }
                return bindings;
              });

      if (!dynamicFilterPredicate.isAll()
          && !dynamicFilterPredicate.equals(pushedDownDynamicFilterPredicate)) {
        if (!partitionMatchesPredicate(
            identityPartitionColumns, partitionValues, dynamicFilterPredicate)) {
          continue;
        }
        if (!fileMatchesPredicate(
            fieldIdToType,
            dynamicFilterPredicate,
            scanTask.file().lowerBounds(),
            scanTask.file().upperBounds(),
            scanTask.file().nullValueCounts())) {
          continue;
        }
      }
      if (!partitionMatchesConstraint(identityPartitionColumns, partitionValues, constraint)) {
        continue;
      }
      if (recordScannedFiles) {
        // Positional and Equality deletes can only be cleaned up if the whole table has been
        // optimized.
        // Equality deletes may apply to many files, and position deletes may be grouped together.
        // This makes it difficult to know if they are obsolete.
        List<org.apache.iceberg.DeleteFile> fullyAppliedDeletes =
            tableHandle.getEnforcedPredicate().isAll() ? scanTask.deletes() : ImmutableList.of();
        scannedFiles.add(new DataFileWithDeleteFiles(scanTask.file(), fullyAppliedDeletes));
      }
      splits.add(icebergSplit);
    }
    return completedFuture(new ConnectorSplitBatch(splits.build(), isFinished()));
  }