in paimon-presto-common/src/main/java/org/apache/paimon/presto/PrestoComputePushdown.java [174:285]
public PlanNode visitFilter(FilterNode filter, Void context) {
if (!(filter.getSource() instanceof TableScanNode)) {
return filter;
}
TableScanNode tableScan = (TableScanNode) filter.getSource();
Map<String, PrestoColumnHandle> nameToColumnHandlesMapping =
tableScan.getAssignments().entrySet().stream()
.collect(
Collectors.toMap(
e -> e.getKey().getName(),
e -> (PrestoColumnHandle) e.getValue()));
RowExpression filterPredicate = filter.getPredicate();
DomainTranslator.ExtractionResult<Subfield> decomposedFilter =
rowExpressionService
.getDomainTranslator()
.fromPredicate(
session,
filterPredicate,
new SubfieldExtractor(
functionResolution,
rowExpressionService.getExpressionOptimizer(),
session)
.toColumnExtractor());
// Build paimon predicate presto column.
TupleDomain<PrestoColumnHandle> entireColumnDomain =
decomposedFilter
.getTupleDomain()
.transform(
subfield ->
subfield.getPath().isEmpty()
? subfield.getRootName()
: null)
.transform(nameToColumnHandlesMapping::get);
// Build paimon predicate column list.
Map<VariableReferenceExpression, ColumnHandle> assignments = tableScan.getAssignments();
Optional<List<ColumnHandle>> projectedColumns =
extractColumns(filterPredicate, assignments);
Table table = ((PrestoTableHandle) tableScan.getTable().getConnectorHandle()).table();
ImmutableSet.Builder<VariableReferenceExpression> builder = ImmutableSet.builder();
decomposedFilter
.getRemainingExpression()
.accept(new VariableCollector(tableScan.getOutputVariables()), builder);
Set<ColumnHandle> remainingFilterProjects =
builder.build().stream()
.map(
v ->
Preconditions.checkNotNull(
assignments.get(v),
"The variable is"
+ " not found in the assignments"))
.collect(Collectors.toSet());
// Prune the partition
Set<String> partitionColumns = new HashSet<>(table.partitionKeys());
Optional<List<Map<String, String>>> remainingPartitions = Optional.empty();
// we have predicate on the partition field, then we have to list the partition and do
// the prune.
if (PrestoSessionProperties.isPartitionPruneEnabled(session)
&& !remainingFilterProjects.isEmpty()
&& remainingFilterProjects.stream()
.map(PrestoColumnHandle.class::cast)
.anyMatch(c -> partitionColumns.contains(c.getColumnName()))) {
Map<String, ColumnHandle> columns =
transactionManager
.get(tableScan.getTable().getTransaction())
.getColumnHandles(
session, tableScan.getTable().getConnectorHandle());
remainingPartitions =
getRemainingPartition(
table, decomposedFilter, session, entireColumnDomain, columns);
}
// Build paimon new presto table handle use pushdown.
PrestoTableHandle oldPrestoTableHandle =
(PrestoTableHandle) tableScan.getTable().getConnectorHandle();
PrestoTableHandle newPrestoTableHandle =
new PrestoTableHandle(
oldPrestoTableHandle.getSchemaName(),
oldPrestoTableHandle.getTableName(),
oldPrestoTableHandle.getSerializedTable(),
entireColumnDomain,
projectedColumns,
remainingPartitions);
PrestoTableLayoutHandle newLayoutHandle =
new PrestoTableLayoutHandle(
newPrestoTableHandle, tableScan.getCurrentConstraint());
TableScanNode newTableScan =
new TableScanNode(
tableScan.getSourceLocation(),
tableScan.getId(),
new TableHandle(
tableScan.getTable().getConnectorId(),
newPrestoTableHandle,
tableScan.getTable().getTransaction(),
Optional.of(newLayoutHandle)),
tableScan.getOutputVariables(),
tableScan.getAssignments(),
tableScan.getCurrentConstraint(),
tableScan.getEnforcedConstraint());
return new FilterNode(
filter.getSourceLocation(), filter.getId(), newTableScan, filterPredicate);
}