in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkBatchProgram.scala [46:300]
def buildProgram(tableConfig: ReadableConfig): FlinkChainedProgram[BatchOptimizeContext] = {
val chainedProgram = new FlinkChainedProgram[BatchOptimizeContext]()
chainedProgram.addLast(
// rewrite sub-queries to joins
SUBQUERY_REWRITE,
FlinkGroupProgramBuilder
.newBuilder[BatchOptimizeContext]
// rewrite QueryOperationCatalogViewTable before rewriting sub-queries
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.TABLE_REF_RULES)
.build(),
"convert table references before rewriting sub-queries to semi-join"
)
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.SEMI_JOIN_RULES)
.build(),
"rewrite sub-queries to semi-join"
)
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.TABLE_SUBQUERY_RULES)
.build(),
"sub-queries remove"
)
// convert RelOptTableImpl (which exists in SubQuery before) to FlinkRelOptTable
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.TABLE_REF_RULES)
.build(),
"convert table references after sub-queries removed"
)
.build()
)
// rewrite special temporal join plan
chainedProgram.addLast(
TEMPORAL_JOIN_REWRITE,
FlinkGroupProgramBuilder
.newBuilder[BatchOptimizeContext]
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.EXPAND_PLAN_RULES)
.build(),
"convert correlate to temporal table join"
)
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.POST_EXPAND_CLEAN_UP_RULES)
.build(),
"convert enumerable table scan"
)
.build()
)
// query decorrelation
chainedProgram.addLast(DECORRELATE, new FlinkDecorrelateProgram)
// default rewrite, includes: predicate simplification, expression reduction, etc.
chainedProgram.addLast(
DEFAULT_REWRITE,
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.DEFAULT_REWRITE_RULES)
.build()
)
// rule based optimization: push down predicate(s)
chainedProgram.addLast(
PREDICATE_PUSHDOWN,
FlinkGroupProgramBuilder
.newBuilder[BatchOptimizeContext]
.addProgram(
FlinkGroupProgramBuilder
.newBuilder[BatchOptimizeContext]
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.JOIN_PREDICATE_REWRITE_RULES)
.build(),
"join predicate rewrite"
)
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.FILTER_PREPARE_RULES)
.build(),
"other predicate rewrite"
)
.setIterations(5)
.build(),
"predicate rewrite"
)
.addProgram(
// PUSH_PARTITION_DOWN_RULES should always be in front of PUSH_FILTER_DOWN_RULES
// to prevent PUSH_FILTER_DOWN_RULES from consuming the predicates in partitions
FlinkGroupProgramBuilder
.newBuilder[BatchOptimizeContext]
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.PUSH_PARTITION_DOWN_RULES)
.build(),
"push down partitions into table scan"
)
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.PUSH_FILTER_DOWN_RULES)
.build(),
"push down filters into table scan"
)
.build(),
"push predicate into table scan"
)
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.PRUNE_EMPTY_RULES)
.build(),
"prune empty after predicate push down"
)
.addProgram(new FlinkRecomputeStatisticsProgram, "recompute statistics")
.build()
)
// join reorder
if (tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)) {
chainedProgram.addLast(
JOIN_REORDER,
FlinkGroupProgramBuilder
.newBuilder[BatchOptimizeContext]
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.JOIN_REORDER_PREPARE_RULES)
.build(),
"merge join into MultiJoin"
)
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.JOIN_REORDER_RULES)
.build(),
"do join reorder"
)
.build()
)
}
// join rewrite
chainedProgram.addLast(
JOIN_REWRITE,
FlinkGroupProgramBuilder
.newBuilder[BatchOptimizeContext]
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.JOIN_COND_EQUAL_TRANSFER_RULES)
.build(),
"simplify and push down join predicates"
)
.addProgram(
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.JOIN_NULL_FILTER_RULES)
.build(),
"deal with possible null join keys"
)
.build()
)
// window rewrite
chainedProgram.addLast(
PROJECT_REWRITE,
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.PROJECT_RULES)
.build()
)
// optimize the logical plan
chainedProgram.addLast(
LOGICAL,
FlinkVolcanoProgramBuilder.newBuilder
.add(FlinkBatchRuleSets.LOGICAL_OPT_RULES)
.setRequiredOutputTraits(Array(FlinkConventions.LOGICAL))
.build()
)
// logical rewrite
chainedProgram.addLast(
LOGICAL_REWRITE,
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.LOGICAL_REWRITE)
.build()
)
// convert time indicators
chainedProgram.addLast(TIME_INDICATOR, new FlinkRelTimeIndicatorProgram)
// optimize the physical plan
chainedProgram.addLast(
PHYSICAL,
FlinkVolcanoProgramBuilder.newBuilder
.add(FlinkBatchRuleSets.PHYSICAL_OPT_RULES)
.setRequiredOutputTraits(Array(FlinkConventions.BATCH_PHYSICAL))
.build()
)
// physical rewrite
chainedProgram.addLast(
PHYSICAL_REWRITE,
FlinkHepRuleSetProgramBuilder.newBuilder
.setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
.setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
.add(FlinkBatchRuleSets.PHYSICAL_REWRITE)
.build()
)
// convert dynamic partition pruning scan source
chainedProgram.addLast(DYNAMIC_PARTITION_PRUNING, new FlinkDynamicPartitionPruningProgram)
// runtime filter optimization
chainedProgram.addLast(RUNTIME_FILTER, new FlinkRuntimeFilterProgram)
chainedProgram
}