def visit()

in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala [560:858]


    def visit(
        rel: StreamPhysicalRel,
        requiredUpdateTrait: UpdateKindTrait): Option[StreamPhysicalRel] =
      rel match {
        case sink: StreamPhysicalSink =>
          val sinkRequiredTraits = inferSinkRequiredTraits(sink)
          val upsertMaterialize = analyzeUpsertMaterializeStrategy(sink)
          visitSink(sink.copy(upsertMaterialize), sinkRequiredTraits)

        case sink: StreamPhysicalLegacySink[_] =>
          val childModifyKindSet = getModifyKindSet(sink.getInput)
          val onlyAfter = onlyAfterOrNone(childModifyKindSet)
          val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
          val sinkRequiredTraits = sink.sink match {
            case _: UpsertStreamTableSink[_] =>
              // support both ONLY_AFTER and BEFORE_AFTER, but prefer ONLY_AFTER
              Seq(onlyAfter, beforeAndAfter)
            case _: RetractStreamTableSink[_] =>
              Seq(beforeAndAfter)
            case _: AppendStreamTableSink[_] | _: StreamTableSink[_] =>
              Seq(UpdateKindTrait.NONE)
            case ds: DataStreamTableSink[_] =>
              if (ds.withChangeFlag) {
                if (ds.needUpdateBefore) {
                  Seq(beforeAndAfter)
                } else {
                  // support both ONLY_AFTER and BEFORE_AFTER, but prefer ONLY_AFTER
                  Seq(onlyAfter, beforeAndAfter)
                }
              } else {
                Seq(UpdateKindTrait.NONE)
              }
          }
          visitSink(sink, sinkRequiredTraits)

        case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate |
            _: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
            _: StreamPhysicalPythonGroupTableAggregate | _: StreamPhysicalGroupWindowAggregateBase |
            _: StreamPhysicalWindowAggregate | _: StreamPhysicalOverAggregate =>
          // Aggregate, TableAggregate, OverAggregate, Limit, GroupWindowAggregate, WindowAggregate,
          // and WindowTableAggregate requires update_before if there are updates
          val requiredChildUpdateTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
          val children = visitChildren(rel, requiredChildUpdateTrait)
          // use requiredTrait as providedTrait, because they should support all kinds of UpdateKind
          createNewNode(rel, children, requiredUpdateTrait)

        case _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
            _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch | _: StreamPhysicalIntervalJoin |
            _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin =>
          // WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP,
          // and IntervalJoin, WindowJoin require nothing about UpdateKind.
          val children = visitChildren(rel, UpdateKindTrait.NONE)
          createNewNode(rel, children, requiredUpdateTrait)

        case rank: StreamPhysicalRank =>
          val rankStrategies =
            RankProcessStrategy.analyzeRankProcessStrategies(rank, rank.partitionKey, rank.orderKey)
          visitRankStrategies(
            rankStrategies,
            requiredUpdateTrait,
            rankStrategy => rank.copy(rankStrategy))

        case sortLimit: StreamPhysicalSortLimit =>
          val rankStrategies = RankProcessStrategy.analyzeRankProcessStrategies(
            sortLimit,
            ImmutableBitSet.of(),
            sortLimit.getCollation)
          visitRankStrategies(
            rankStrategies,
            requiredUpdateTrait,
            rankStrategy => sortLimit.copy(rankStrategy))

        case sort: StreamPhysicalSort =>
          val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(sort.getInput))
          val children = visitChildren(sort, requiredChildTrait)
          createNewNode(sort, children, requiredUpdateTrait)

        case join: StreamPhysicalJoin =>
          val onlyAfterByParent = requiredUpdateTrait.updateKind == UpdateKind.ONLY_UPDATE_AFTER
          val children = join.getInputs.zipWithIndex.map {
            case (child, childOrdinal) =>
              val physicalChild = child.asInstanceOf[StreamPhysicalRel]
              val supportOnlyAfter = join.inputUniqueKeyContainsJoinKey(childOrdinal)
              val inputModifyKindSet = getModifyKindSet(physicalChild)
              if (onlyAfterByParent) {
                if (inputModifyKindSet.contains(ModifyKind.UPDATE) && !supportOnlyAfter) {
                  // the parent requires only-after, however, the join doesn't support this
                  None
                } else {
                  this.visit(physicalChild, onlyAfterOrNone(inputModifyKindSet))
                }
              } else {
                this.visit(physicalChild, beforeAfterOrNone(inputModifyKindSet))
              }
          }
          if (children.exists(_.isEmpty)) {
            None
          } else {
            createNewNode(join, Some(children.flatten.toList), requiredUpdateTrait)
          }

        case temporalJoin: StreamPhysicalTemporalJoin =>
          val left = temporalJoin.getLeft.asInstanceOf[StreamPhysicalRel]
          val right = temporalJoin.getRight.asInstanceOf[StreamPhysicalRel]

          // the left input required trait depends on it's parent in temporal join
          // the left input will send message to parent
          val requiredUpdateBeforeByParent =
            requiredUpdateTrait.updateKind == UpdateKind.BEFORE_AND_AFTER
          val leftInputModifyKindSet = getModifyKindSet(left)
          val leftRequiredTrait = if (requiredUpdateBeforeByParent) {
            beforeAfterOrNone(leftInputModifyKindSet)
          } else {
            onlyAfterOrNone(leftInputModifyKindSet)
          }
          val newLeftOption = this.visit(left, leftRequiredTrait)

          val rightInputModifyKindSet = getModifyKindSet(right)
          // currently temporal join support changelog stream as the right side
          // so it supports both ONLY_AFTER and BEFORE_AFTER, but prefer ONLY_AFTER
          val newRightOption = this.visit(right, onlyAfterOrNone(rightInputModifyKindSet)) match {
            case Some(newRight) => Some(newRight)
            case None =>
              val beforeAfter = beforeAfterOrNone(rightInputModifyKindSet)
              this.visit(right, beforeAfter)
          }

          (newLeftOption, newRightOption) match {
            case (Some(newLeft), Some(newRight)) =>
              val leftTrait = newLeft.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
              createNewNode(temporalJoin, Some(List(newLeft, newRight)), leftTrait)
            case _ =>
              None
          }

        // if the condition is applied on the upsert key, we can emit whatever the requiredTrait
        // is, because we will filter all records based on the condition that applies to that key
        case calc: StreamPhysicalCalcBase =>
          if (
            requiredUpdateTrait == UpdateKindTrait.ONLY_UPDATE_AFTER &&
            isNonUpsertKeyCondition(calc)
          ) {
            // we don't expect filter to satisfy ONLY_UPDATE_AFTER update kind,
            // to solve the bad case like a single 'cnt < 10' condition after aggregation.
            // See FLINK-9528.
            None
          } else {
            // otherwise, forward UpdateKind requirement
            visitChildren(rel, requiredUpdateTrait) match {
              case None => None
              case Some(children) =>
                val childTrait = children.head.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
                createNewNode(rel, Some(children), childTrait)
            }
          }

        case _: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin |
            _: StreamPhysicalExchange | _: StreamPhysicalExpand |
            _: StreamPhysicalMiniBatchAssigner | _: StreamPhysicalWatermarkAssigner |
            _: StreamPhysicalWindowTableFunction =>
          // transparent forward requiredTrait to children
          visitChildren(rel, requiredUpdateTrait) match {
            case None => None
            case Some(children) =>
              val childTrait = children.head.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
              createNewNode(rel, Some(children), childTrait)
          }

        case union: StreamPhysicalUnion =>
          val children = union.getInputs.map {
            case child: StreamPhysicalRel =>
              val childModifyKindSet = getModifyKindSet(child)
              val requiredChildTrait = if (childModifyKindSet.isInsertOnly) {
                UpdateKindTrait.NONE
              } else {
                requiredUpdateTrait
              }
              this.visit(child, requiredChildTrait)
          }.toList

          if (children.exists(_.isEmpty)) {
            None
          } else {
            val updateKinds = children.flatten
              .map(_.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE))
            // union can just forward changes, can't actively satisfy to another changelog mode
            val providedTrait = if (updateKinds.forall(k => UpdateKindTrait.NONE == k)) {
              // if all the children is NO_UPDATE, union is NO_UPDATE
              UpdateKindTrait.NONE
            } else {
              // otherwise, merge update kinds.
              val merged = updateKinds
                .map(_.updateKind)
                .reduce {
                  (l, r) =>
                    (l, r) match {
                      case (UpdateKind.NONE, r: UpdateKind) => r
                      case (l: UpdateKind, UpdateKind.NONE) => l
                      case (l: UpdateKind, r: UpdateKind) if l == r => l
                      // UNION doesn't support to union ONLY_UPDATE_AFTER and BEFORE_AND_AFTER inputs
                      case (_, _) => return None
                    }
                }
              new UpdateKindTrait(merged)
            }
            createNewNode(union, Some(children.flatten), providedTrait)
          }

        case normalize: StreamPhysicalChangelogNormalize =>
          // changelog normalize currently only supports input only sending UPDATE_AFTER
          val children = visitChildren(normalize, UpdateKindTrait.ONLY_UPDATE_AFTER)
          // use requiredTrait as providedTrait,
          // because changelog normalize supports all kinds of UpdateKind
          createNewNode(rel, children, requiredUpdateTrait)

        case ts: StreamPhysicalTableSourceScan =>
          // currently only support BEFORE_AND_AFTER if source produces updates
          val providedTrait = UpdateKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
          val newSource = createNewNode(rel, Some(List()), providedTrait)
          if (
            providedTrait.equals(UpdateKindTrait.BEFORE_AND_AFTER) &&
            requiredUpdateTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER)
          ) {
            // requiring only-after, but the source is CDC source, then drop update_before manually
            val dropUB = new StreamPhysicalDropUpdateBefore(rel.getCluster, rel.getTraitSet, rel)
            createNewNode(dropUB, newSource.map(s => List(s)), requiredUpdateTrait)
          } else {
            newSource
          }

        case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan |
            _: StreamPhysicalValues =>
          createNewNode(rel, Some(List()), UpdateKindTrait.NONE)

        case scan: StreamPhysicalIntermediateTableScan =>
          val providedTrait = if (scan.intermediateTable.isUpdateBeforeRequired) {
            // we can't drop UPDATE_BEFORE if it is required by other parent blocks
            UpdateKindTrait.BEFORE_AND_AFTER
          } else {
            requiredUpdateTrait
          }
          if (!providedTrait.satisfies(requiredUpdateTrait)) {
            // require ONLY_AFTER but can only provide BEFORE_AND_AFTER
            None
          } else {
            createNewNode(rel, Some(List()), providedTrait)
          }

        case process: StreamPhysicalProcessTableFunction =>
          // Required update traits depend on the table argument declaration,
          // input traits, partition keys, and upsert keys
          val inputArgs = StreamPhysicalProcessTableFunction
            .getProvidedInputArgs(process.getCall)
          val children = process.getInputs
            .map(_.asInstanceOf[StreamPhysicalRel])
            .zipWithIndex
            .map {
              case (child, inputIndex) =>
                // For PTF without table arguments (i.e. values child)
                if (inputArgs.isEmpty) {
                  this.visit(child, UpdateKindTrait.NONE)
                }
                // Derive the required update trait for table arguments
                else {
                  val inputArg = inputArgs.get(inputIndex)
                  val (tableArg, tableArgCall, modifyKindSet) =
                    extractPtfTableArgComponents(process, child, inputArg)
                  val requiredUpdateTrait =
                    if (
                      !modifyKindSet.isInsertOnly && tableArg.is(
                        StaticArgumentTrait.SUPPORT_UPDATES)
                    ) {
                      if (isPtfUpsert(tableArg, tableArgCall, child)) {
                        UpdateKindTrait.ONLY_UPDATE_AFTER
                      } else {
                        UpdateKindTrait.BEFORE_AND_AFTER
                      }
                    } else {
                      UpdateKindTrait.NONE
                    }
                  this.visit(child, requiredUpdateTrait)
                }
            }
            .toList
            .flatten
          // Query PTF for upsert vs. retract
          val providedUpdateTrait = queryPtfChangelogMode(
            process,
            children,
            toChangelogMode(process, Some(requiredUpdateTrait), None),
            UpdateKindTrait.fromChangelogMode,
            UpdateKindTrait.NONE)
          createNewNode(rel, Some(children), providedUpdateTrait)

        case _ =>
          throw new UnsupportedOperationException(
            s"Unsupported visit for ${rel.getClass.getSimpleName}")

      }