def buildProgram()

in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkStreamProgram.scala [44:330]


  def buildProgram(tableConfig: ReadableConfig): FlinkChainedProgram[StreamOptimizeContext] = {
    val chainedProgram = new FlinkChainedProgram[StreamOptimizeContext]()

    // rewrite sub-queries to joins
    chainedProgram.addLast(
      SUBQUERY_REWRITE,
      FlinkGroupProgramBuilder
        .newBuilder[StreamOptimizeContext]
        // rewrite QueryOperationCatalogViewTable before rewriting sub-queries
        .addProgram(
          FlinkHepRuleSetProgramBuilder.newBuilder
            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
            .add(FlinkStreamRuleSets.TABLE_REF_RULES)
            .build(),
          "convert table references before rewriting sub-queries to semi-join"
        )
        .addProgram(
          FlinkHepRuleSetProgramBuilder.newBuilder
            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
            .add(FlinkStreamRuleSets.SEMI_JOIN_RULES)
            .build(),
          "rewrite sub-queries to semi-join"
        )
        .addProgram(
          FlinkHepRuleSetProgramBuilder.newBuilder
            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
            .add(FlinkStreamRuleSets.TABLE_SUBQUERY_RULES)
            .build(),
          "sub-queries remove"
        )
        // convert RelOptTableImpl (which exists in SubQuery before) to FlinkRelOptTable
        .addProgram(
          FlinkHepRuleSetProgramBuilder.newBuilder
            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
            .add(FlinkStreamRuleSets.TABLE_REF_RULES)
            .build(),
          "convert table references after sub-queries removed"
        )
        .build()
    )

    // rewrite special temporal join plan
    chainedProgram.addLast(
      TEMPORAL_JOIN_REWRITE,
      FlinkGroupProgramBuilder
        .newBuilder[StreamOptimizeContext]
        .addProgram(
          FlinkHepRuleSetProgramBuilder.newBuilder
            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
            .add(FlinkStreamRuleSets.EXPAND_PLAN_RULES)
            .build(),
          "convert correlate to temporal table join"
        )
        .addProgram(
          FlinkHepRuleSetProgramBuilder.newBuilder
            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
            .add(FlinkStreamRuleSets.POST_EXPAND_CLEAN_UP_RULES)
            .build(),
          "convert enumerable table scan"
        )
        .build()
    )

    // query decorrelation
    chainedProgram.addLast(
      DECORRELATE,
      FlinkGroupProgramBuilder
        .newBuilder[StreamOptimizeContext]
        // rewrite before decorrelation
        .addProgram(
          FlinkHepRuleSetProgramBuilder.newBuilder
            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
            .add(FlinkStreamRuleSets.PRE_DECORRELATION_RULES)
            .build(),
          "pre-rewrite before decorrelation"
        )
        .addProgram(new FlinkDecorrelateProgram)
        .build()
    )

    // default rewrite, includes: predicate simplification, expression reduction, window
    // properties rewrite, etc.
    chainedProgram.addLast(
      DEFAULT_REWRITE,
      FlinkHepRuleSetProgramBuilder.newBuilder
        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
        .add(FlinkStreamRuleSets.DEFAULT_REWRITE_RULES)
        .build()
    )

    // rule based optimization: push down predicate(s) in where clause, so it only needs to read
    // the required data
    chainedProgram.addLast(
      PREDICATE_PUSHDOWN,
      FlinkGroupProgramBuilder
        .newBuilder[StreamOptimizeContext]
        .addProgram(
          FlinkGroupProgramBuilder
            .newBuilder[StreamOptimizeContext]
            .addProgram(
              FlinkHepRuleSetProgramBuilder
                .newBuilder[StreamOptimizeContext]
                .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
                .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
                .add(FlinkStreamRuleSets.JOIN_PREDICATE_REWRITE_RULES)
                .build(),
              "join predicate rewrite"
            )
            .addProgram(
              FlinkHepRuleSetProgramBuilder.newBuilder
                .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
                .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
                .add(FlinkStreamRuleSets.FILTER_PREPARE_RULES)
                .build(),
              "filter rules"
            )
            .setIterations(5)
            .build(),
          "predicate rewrite"
        )
        .addProgram(
          // PUSH_PARTITION_DOWN_RULES should always be in front of PUSH_FILTER_DOWN_RULES
          // to prevent PUSH_FILTER_DOWN_RULES from consuming the predicates in partitions
          FlinkGroupProgramBuilder
            .newBuilder[StreamOptimizeContext]
            .addProgram(
              FlinkHepRuleSetProgramBuilder.newBuilder
                .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
                .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
                .add(FlinkStreamRuleSets.PUSH_PARTITION_DOWN_RULES)
                .build(),
              "push down partitions into table scan"
            )
            .addProgram(
              FlinkHepRuleSetProgramBuilder.newBuilder
                .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
                .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
                .add(FlinkStreamRuleSets.PUSH_FILTER_DOWN_RULES)
                .build(),
              "push down filters into table scan"
            )
            .build(),
          "push predicate into table scan"
        )
        .addProgram(
          FlinkHepRuleSetProgramBuilder.newBuilder
            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
            .add(FlinkStreamRuleSets.PRUNE_EMPTY_RULES)
            .build(),
          "prune empty after predicate push down"
        )
        .build()
    )

    // join reorder
    if (tableConfig.get(OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED)) {
      chainedProgram.addLast(
        JOIN_REORDER,
        FlinkGroupProgramBuilder
          .newBuilder[StreamOptimizeContext]
          .addProgram(
            FlinkHepRuleSetProgramBuilder.newBuilder
              .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
              .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
              .add(FlinkStreamRuleSets.JOIN_REORDER_PREPARE_RULES)
              .build(),
            "merge join into MultiJoin"
          )
          .addProgram(
            FlinkHepRuleSetProgramBuilder.newBuilder
              .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
              .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
              .add(FlinkStreamRuleSets.JOIN_REORDER_RULES)
              .build(),
            "do join reorder"
          )
          .build()
      )
    }

    // project rewrite
    chainedProgram.addLast(
      PROJECT_REWRITE,
      FlinkHepRuleSetProgramBuilder.newBuilder
        .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
        .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
        .add(FlinkStreamRuleSets.PROJECT_RULES)
        .build()
    )

    // optimize the logical plan
    chainedProgram.addLast(
      LOGICAL,
      FlinkVolcanoProgramBuilder.newBuilder
        .add(FlinkStreamRuleSets.LOGICAL_OPT_RULES)
        .setRequiredOutputTraits(Array(FlinkConventions.LOGICAL))
        .build()
    )

    // logical rewrite
    chainedProgram.addLast(
      LOGICAL_REWRITE,
      FlinkGroupProgramBuilder
        .newBuilder[StreamOptimizeContext]
        .addProgram(
          FlinkHepRuleSetProgramBuilder.newBuilder
            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
            .add(FlinkStreamRuleSets.LOGICAL_REWRITE)
            .build())
        .addProgram(
          FlinkHepRuleSetProgramBuilder.newBuilder
            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
            .add(EventTimeTemporalJoinRewriteRule.EVENT_TIME_TEMPORAL_JOIN_REWRITE_RULES)
            .build())
        .build()
    )

    // convert time indicators
    chainedProgram.addLast(TIME_INDICATOR, new FlinkRelTimeIndicatorProgram)

    // optimize the physical plan
    chainedProgram.addLast(
      PHYSICAL,
      FlinkVolcanoProgramBuilder.newBuilder
        .add(FlinkStreamRuleSets.PHYSICAL_OPT_RULES)
        .setRequiredOutputTraits(Array(FlinkConventions.STREAM_PHYSICAL))
        .build()
    )

    // physical rewrite
    chainedProgram.addLast(
      PHYSICAL_REWRITE,
      FlinkGroupProgramBuilder
        .newBuilder[StreamOptimizeContext]
        // add a HEP program for watermark transpose rules to make this optimization deterministic
        // Applying these rules before the changelog mode inference is important because
        // 1. if we transpose calc and projection before we infer the changelog mode, we
        //    can, e.g., project out metadata columns which may potentially allow us to drop the
        //    changelog normalize node
        // 2. if we push a condition into a changelog normalize we may emit only UPDATE_AFTER, if
        //    the downstream operators don't need UPDATE_BEFORE, without the push we need to emit
        //    UPDATE_BEFORE. With the condition evaluated inside of changelog normalize we can
        //    emit DELETEs instead of UPDATE_BEFORE if the condition is not met.
        .addProgram(
          FlinkHepRuleSetProgramBuilder.newBuilder
            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
            .add(FlinkStreamRuleSets.CHANGELOG_NORMALIZE_TRANSPOSE_RULES)
            .build(),
          "watermark transpose"
        )
        .addProgram(new FlinkChangelogModeInferenceProgram, "Changelog mode inference")
        .addProgram(
          new FlinkMiniBatchIntervalTraitInitProgram,
          "Initialization for mini-batch interval inference")
        .addProgram(
          FlinkHepRuleSetProgramBuilder.newBuilder
            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE)
            .setHepMatchOrder(HepMatchOrder.TOP_DOWN)
            .add(FlinkStreamRuleSets.MINI_BATCH_RULES)
            .build(),
          "mini-batch interval rules"
        )
        .addProgram(
          FlinkHepRuleSetProgramBuilder.newBuilder
            .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION)
            .setHepMatchOrder(HepMatchOrder.BOTTOM_UP)
            .add(FlinkStreamRuleSets.PHYSICAL_REWRITE)
            .build(),
          "physical rewrite"
        )
        .build()
    )

    chainedProgram
  }