in amoro-format-mixed/amoro-mixed-trino/src/main/java/org/apache/amoro/trino/unkeyed/IcebergSplitSource.java [176:318]
public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize) {
long timeLeft =
dynamicFilteringWaitTimeoutMillis - dynamicFilterWaitStopwatch.elapsed(MILLISECONDS);
if (dynamicFilter.isAwaitable() && timeLeft > 0) {
return dynamicFilter
.isBlocked()
.thenApply(ignored -> EMPTY_BATCH)
.completeOnTimeout(EMPTY_BATCH, timeLeft, MILLISECONDS);
}
if (fileScanTaskIterable == null) {
// Used to avoid duplicating work if the Dynamic Filter was already pushed down to the Iceberg
// API
boolean dynamicFilterIsComplete = dynamicFilter.isComplete();
this.pushedDownDynamicFilterPredicate =
dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast);
TupleDomain<IcebergColumnHandle> fullPredicate =
tableHandle.getUnenforcedPredicate().intersect(pushedDownDynamicFilterPredicate);
// TODO: (https://github.com/trinodb/trino/issues/9743): Consider removing
// TupleDomain#simplify
TupleDomain<IcebergColumnHandle> simplifiedPredicate =
fullPredicate.simplify(ICEBERG_DOMAIN_COMPACTION_THRESHOLD);
boolean usedSimplifiedPredicate = !simplifiedPredicate.equals(fullPredicate);
if (usedSimplifiedPredicate) {
// Pushed down predicate was simplified, always evaluate it against individual splits
this.pushedDownDynamicFilterPredicate = TupleDomain.all();
}
TupleDomain<IcebergColumnHandle> effectivePredicate =
dataColumnPredicate.intersect(simplifiedPredicate);
if (effectivePredicate.isNone()) {
finish();
return completedFuture(NO_MORE_SPLITS_BATCH);
}
Expression filterExpression = toIcebergExpression(effectivePredicate);
// If the Dynamic Filter will be evaluated against each file, stats are required. Otherwise,
// skip them.
boolean requiresColumnStats = usedSimplifiedPredicate || !dynamicFilterIsComplete;
TableScan scan = tableScan.filter(filterExpression);
if (requiresColumnStats) {
scan = scan.includeColumnStats();
}
if (tableScan instanceof ChangeTableIncrementalScan) {
this.fileScanTaskIterable = scan.planFiles();
} else {
this.fileScanTaskIterable =
TableScanUtil.splitFiles(scan.planFiles(), tableScan.targetSplitSize());
}
closer.register(fileScanTaskIterable);
this.fileScanTaskIterator = fileScanTaskIterable.iterator();
closer.register(fileScanTaskIterator);
// TODO: Remove when NPE check has been released:
// https://github.com/trinodb/trino/issues/15372
isFinished();
}
TupleDomain<IcebergColumnHandle> dynamicFilterPredicate =
dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast);
if (dynamicFilterPredicate.isNone()) {
finish();
return completedFuture(NO_MORE_SPLITS_BATCH);
}
Iterator<FileScanTask> fileScanTasks = Iterators.limit(fileScanTaskIterator, maxSize);
ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder();
while (fileScanTasks.hasNext()) {
FileScanTask scanTask = fileScanTasks.next();
if (scanTask.deletes().isEmpty()
&& maxScannedFileSizeInBytes.isPresent()
&& scanTask.file().fileSizeInBytes() > maxScannedFileSizeInBytes.get()) {
continue;
}
if (!pathDomain.includesNullableValue(utf8Slice(scanTask.file().path().toString()))) {
continue;
}
if (!fileModifiedTimeDomain.isAll()) {
long fileModifiedTime = getModificationTime(scanTask.file().path().toString());
if (!fileModifiedTimeDomain.includesNullableValue(
packDateTimeWithZone(fileModifiedTime, UTC_KEY))) {
continue;
}
}
IcebergSplit icebergSplit = toIcebergSplit(scanTask);
Schema fileSchema = scanTask.spec().schema();
Map<Integer, Optional<String>> partitionKeys = getPartitionKeys(scanTask);
Set<IcebergColumnHandle> identityPartitionColumns =
partitionKeys.keySet().stream()
.map(fieldId -> getColumnHandle(fileSchema.findField(fieldId), typeManager))
.collect(toImmutableSet());
Supplier<Map<ColumnHandle, NullableValue>> partitionValues =
memoize(
() -> {
Map<ColumnHandle, NullableValue> bindings = new HashMap<>();
for (IcebergColumnHandle partitionColumn : identityPartitionColumns) {
Object partitionValue =
deserializePartitionValue(
partitionColumn.getType(),
partitionKeys.get(partitionColumn.getId()).orElse(null),
partitionColumn.getName());
NullableValue bindingValue =
new NullableValue(partitionColumn.getType(), partitionValue);
bindings.put(partitionColumn, bindingValue);
}
return bindings;
});
if (!dynamicFilterPredicate.isAll()
&& !dynamicFilterPredicate.equals(pushedDownDynamicFilterPredicate)) {
if (!partitionMatchesPredicate(
identityPartitionColumns, partitionValues, dynamicFilterPredicate)) {
continue;
}
if (!fileMatchesPredicate(
fieldIdToType,
dynamicFilterPredicate,
scanTask.file().lowerBounds(),
scanTask.file().upperBounds(),
scanTask.file().nullValueCounts())) {
continue;
}
}
if (!partitionMatchesConstraint(identityPartitionColumns, partitionValues, constraint)) {
continue;
}
if (recordScannedFiles) {
// Positional and Equality deletes can only be cleaned up if the whole table has been
// optimized.
// Equality deletes may apply to many files, and position deletes may be grouped together.
// This makes it difficult to know if they are obsolete.
List<org.apache.iceberg.DeleteFile> fullyAppliedDeletes =
tableHandle.getEnforcedPredicate().isAll() ? scanTask.deletes() : ImmutableList.of();
scannedFiles.add(new DataFileWithDeleteFiles(scanTask.file(), fullyAppliedDeletes));
}
splits.add(icebergSplit);
}
return completedFuture(new ConnectorSplitBatch(splits.build(), isFinished()));
}