in samza-sql/src/main/java/org/apache/samza/sql/planner/SamzaSqlFilterRemoteJoinRule.java [72:241]
protected void perform(RelOptRuleCall call, Filter filter,
Join join) {
final List<RexNode> joinFilters =
RelOptUtil.conjunctions(join.getCondition());
boolean donotOptimizeLeft = false;
boolean donotOptimizeRight = false;
JoinInputNode.InputType inputTypeOnLeft =
JoinInputNode.getInputType(join.getLeft(), systemStreamConfigBySource);
JoinInputNode.InputType inputTypeOnRight =
JoinInputNode.getInputType(join.getRight(), systemStreamConfigBySource);
// Disable this optimization for queries using local table.
if (inputTypeOnLeft == InputType.LOCAL_TABLE || inputTypeOnRight == InputType.LOCAL_TABLE) {
donotOptimizeLeft = true;
donotOptimizeRight = true;
}
// There is nothing to optimize on the remote table side as the lookup needs to happen first before filtering.
if (inputTypeOnLeft == InputType.REMOTE_TABLE) {
donotOptimizeLeft = true;
}
if (inputTypeOnRight == InputType.REMOTE_TABLE) {
donotOptimizeRight = true;
}
// If there is only the joinRel,
// make sure it does not match a cartesian product joinRel
// (with "true" condition), otherwise this rule will be applied
// again on the new cartesian product joinRel.
if (filter == null && joinFilters.isEmpty()) {
return;
}
final List<RexNode> aboveFilters =
filter != null
? RelOptUtil.conjunctions(filter.getCondition())
: new ArrayList<>();
final ImmutableList<RexNode> origAboveFilters =
ImmutableList.copyOf(aboveFilters);
// Simplify Outer Joins
JoinRelType joinType = join.getJoinType();
if (smart
&& !origAboveFilters.isEmpty()
&& join.getJoinType() != JoinRelType.INNER) {
joinType = RelOptUtil.simplifyJoin(join, origAboveFilters, joinType);
}
final List<RexNode> leftFilters = new ArrayList<>();
final List<RexNode> rightFilters = new ArrayList<>();
// TODO - add logic to derive additional filters. E.g., from
// (t1.a = 1 AND t2.a = 2) OR (t1.b = 3 AND t2.b = 4), you can
// derive table filters:
// (t1.a = 1 OR t1.b = 3)
// (t2.a = 2 OR t2.b = 4)
// Try to push down above filters. These are typically where clause
// filters. They can be pushed down if they are not on the NULL
// generating side.
// We do not push into join condition as we do not benefit much. There is also correctness issue
// with remote table as we will not have values for the remote table before the join/lookup.
// leftFilters and rightFilters are populated in classifyFilters API.
boolean filterPushed = false;
if (RelOptUtil.classifyFilters(
join,
aboveFilters,
joinType,
false, // Let's not push into join filter
!joinType.generatesNullsOnLeft() && !donotOptimizeLeft,
!joinType.generatesNullsOnRight() && !donotOptimizeRight,
joinFilters,
leftFilters,
rightFilters)) {
filterPushed = true;
}
// If no filter got pushed after validate, reset filterPushed flag
if (leftFilters.isEmpty()
&& rightFilters.isEmpty()) {
filterPushed = false;
}
boolean isAntiJoin = joinType == JoinRelType.ANTI;
// Try to push down filters in ON clause. A ON clause filter can only be
// pushed down if it does not affect the non-matching set, i.e. it is
// not on the side which is preserved.
// A ON clause filter of anti-join can not be pushed down.
if (!isAntiJoin && RelOptUtil.classifyFilters(
join,
joinFilters,
joinType,
false,
!joinType.generatesNullsOnLeft() && !donotOptimizeLeft,
!joinType.generatesNullsOnRight() && !donotOptimizeRight,
joinFilters,
leftFilters,
rightFilters)) {
filterPushed = true;
}
// if nothing actually got pushed and there is nothing leftover,
// then this rule is a no-op
if ((!filterPushed
&& joinType == join.getJoinType())
|| (joinFilters.isEmpty()
&& leftFilters.isEmpty()
&& rightFilters.isEmpty())) {
return;
}
// create Filters on top of the children if any filters were
// pushed to them
final RexBuilder rexBuilder = join.getCluster().getRexBuilder();
final RelBuilder relBuilder = call.builder();
final RelNode leftRel = relBuilder.push(join.getLeft()).filter(leftFilters).build();
final RelNode rightRel = relBuilder.push(join.getRight()).filter(rightFilters).build();
// create the new join node referencing the new children and
// containing its new join filters (if there are any)
final ImmutableList<RelDataType> fieldTypes =
ImmutableList.<RelDataType>builder()
.addAll(RelOptUtil.getFieldTypeList(leftRel.getRowType()))
.addAll(RelOptUtil.getFieldTypeList(rightRel.getRowType())).build();
final RexNode joinFilter =
RexUtil.composeConjunction(rexBuilder,
RexUtil.fixUp(rexBuilder, joinFilters, fieldTypes));
// If nothing actually got pushed and there is nothing leftover,
// then this rule is a no-op
if (joinFilter.isAlwaysTrue()
&& leftFilters.isEmpty()
&& rightFilters.isEmpty()
&& joinType == join.getJoinType()) {
return;
}
RelNode newJoinRel =
join.copy(
join.getTraitSet(),
joinFilter,
leftRel,
rightRel,
joinType,
join.isSemiJoinDone());
call.getPlanner().onCopy(join, newJoinRel);
if (!leftFilters.isEmpty()) {
call.getPlanner().onCopy(filter, leftRel);
}
if (!rightFilters.isEmpty()) {
call.getPlanner().onCopy(filter, rightRel);
}
relBuilder.push(newJoinRel);
// Create a project on top of the join if some of the columns have become
// NOT NULL due to the join-type getting stricter.
relBuilder.convert(join.getRowType(), false);
// create a FilterRel on top of the join if needed
relBuilder.filter(
RexUtil.fixUp(rexBuilder, aboveFilters,
RelOptUtil.getFieldTypeList(relBuilder.peek().getRowType())));
call.transformTo(relBuilder.build());
}