def visit()

in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala [152:450]


    def visit(
        rel: StreamPhysicalRel,
        requiredTrait: ModifyKindSetTrait,
        requester: String): StreamPhysicalRel = rel match {
      case sink: StreamPhysicalSink =>
        val name = s"Table sink '${sink.contextResolvedTable.getIdentifier.asSummaryString()}'"
        val queryModifyKindSet = deriveQueryDefaultChangelogMode(sink.getInput, name)
        val sinkRequiredTrait =
          ModifyKindSetTrait.fromChangelogMode(sink.tableSink.getChangelogMode(queryModifyKindSet))
        val children = visitChildren(sink, sinkRequiredTrait, name)
        val sinkTrait = sink.getTraitSet.plus(ModifyKindSetTrait.EMPTY)
        // ignore required trait from context, because sink is the true root
        sink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel]

      case sink: StreamPhysicalLegacySink[_] =>
        val (sinkRequiredTrait, name) = sink.sink match {
          case _: UpsertStreamTableSink[_] =>
            (ModifyKindSetTrait.ALL_CHANGES, "UpsertStreamTableSink")
          case _: RetractStreamTableSink[_] =>
            (ModifyKindSetTrait.ALL_CHANGES, "RetractStreamTableSink")
          case _: AppendStreamTableSink[_] =>
            (ModifyKindSetTrait.INSERT_ONLY, "AppendStreamTableSink")
          case _: StreamTableSink[_] =>
            (ModifyKindSetTrait.INSERT_ONLY, "StreamTableSink")
          case ds: DataStreamTableSink[_] =>
            if (ds.withChangeFlag) {
              (ModifyKindSetTrait.ALL_CHANGES, "toRetractStream")
            } else {
              (ModifyKindSetTrait.INSERT_ONLY, "toAppendStream")
            }
          case _ =>
            throw new UnsupportedOperationException(
              s"Unsupported sink '${sink.sink.getClass.getSimpleName}'")
        }
        val children = visitChildren(sink, sinkRequiredTrait, name)
        val sinkTrait = sink.getTraitSet.plus(ModifyKindSetTrait.EMPTY)
        // ignore required trait from context, because sink is the true root
        sink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel]

      case agg: StreamPhysicalGroupAggregate =>
        // agg support all changes in input
        val children = visitChildren(agg, ModifyKindSetTrait.ALL_CHANGES)
        val inputModifyKindSet = getModifyKindSet(children.head)
        val builder = ModifyKindSet
          .newBuilder()
          .addContainedKind(ModifyKind.INSERT)
          .addContainedKind(ModifyKind.UPDATE)
        if (
          inputModifyKindSet.contains(ModifyKind.UPDATE) ||
          inputModifyKindSet.contains(ModifyKind.DELETE)
        ) {
          builder.addContainedKind(ModifyKind.DELETE)
        }
        val providedTrait = new ModifyKindSetTrait(builder.build())
        createNewNode(agg, children, providedTrait, requiredTrait, requester)

      case tagg: StreamPhysicalGroupTableAggregateBase =>
        // table agg support all changes in input
        val children = visitChildren(tagg, ModifyKindSetTrait.ALL_CHANGES)
        // table aggregate will produce all changes, including deletions
        createNewNode(tagg, children, ModifyKindSetTrait.ALL_CHANGES, requiredTrait, requester)

      case agg: StreamPhysicalPythonGroupAggregate =>
        // agg support all changes in input
        val children = visitChildren(agg, ModifyKindSetTrait.ALL_CHANGES)
        val inputModifyKindSet = getModifyKindSet(children.head)
        val builder = ModifyKindSet
          .newBuilder()
          .addContainedKind(ModifyKind.INSERT)
          .addContainedKind(ModifyKind.UPDATE)
        if (
          inputModifyKindSet.contains(ModifyKind.UPDATE) ||
          inputModifyKindSet.contains(ModifyKind.DELETE)
        ) {
          builder.addContainedKind(ModifyKind.DELETE)
        }
        val providedTrait = new ModifyKindSetTrait(builder.build())
        createNewNode(agg, children, providedTrait, requiredTrait, requester)

      case window: StreamPhysicalGroupWindowAggregateBase =>
        // WindowAggregate and WindowTableAggregate support all changes in input
        val children = visitChildren(window, ModifyKindSetTrait.ALL_CHANGES)
        val builder = ModifyKindSet
          .newBuilder()
          .addContainedKind(ModifyKind.INSERT)
        if (window.emitStrategy.produceUpdates) {
          builder.addContainedKind(ModifyKind.UPDATE)
        }
        val providedTrait = new ModifyKindSetTrait(builder.build())
        createNewNode(window, children, providedTrait, requiredTrait, requester)

      case window: StreamPhysicalWindowAggregate =>
        // WindowAggregate and WindowTableAggregate support all changes in input
        val children = visitChildren(window, ModifyKindSetTrait.ALL_CHANGES)
        // TODO support early / late fire and then this node may produce update records
        val providedTrait = new ModifyKindSetTrait(ModifyKindSet.INSERT_ONLY)
        createNewNode(window, children, providedTrait, requiredTrait, requester)

      case _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate =>
        // WindowAggregate, WindowRank, WindowDeduplicate support insert-only in input
        val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
        val providedTrait = ModifyKindSetTrait.INSERT_ONLY
        createNewNode(rel, children, providedTrait, requiredTrait, requester)

      case rank: StreamPhysicalRank if RankUtil.isDeduplication(rank) =>
        val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
        val tableConfig = unwrapTableConfig(rank)

        // if the rank is deduplication and can be executed as insert-only, forward that information
        val insertOnly = children.forall(ChangelogPlanUtils.isInsertOnly)

        val providedTrait = {
          if (
            insertOnly && RankUtil.outputInsertOnlyInDeduplicate(
              tableConfig,
              RankUtil.keepLastDeduplicateRow(rank.orderKey))
          ) {
            // Deduplicate outputs append only if first row is kept and mini batching is disabled
            ModifyKindSetTrait.INSERT_ONLY
          } else {
            ModifyKindSetTrait.ALL_CHANGES
          }
        }

        createNewNode(rel, children, providedTrait, requiredTrait, requester)

      case rank: StreamPhysicalRank if !RankUtil.isDeduplication(rank) =>
        // Rank supports consuming all changes
        val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
        createNewNode(rel, children, ModifyKindSetTrait.ALL_CHANGES, requiredTrait, requester)

      case limit: StreamPhysicalLimit =>
        // limit support all changes in input
        val children = visitChildren(limit, ModifyKindSetTrait.ALL_CHANGES)
        val providedTrait = if (getModifyKindSet(children.head).isInsertOnly) {
          ModifyKindSetTrait.INSERT_ONLY
        } else {
          ModifyKindSetTrait.ALL_CHANGES
        }
        createNewNode(limit, children, providedTrait, requiredTrait, requester)

      case _: StreamPhysicalSortLimit =>
        // SortLimit supports consuming all changes
        val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
        createNewNode(rel, children, ModifyKindSetTrait.ALL_CHANGES, requiredTrait, requester)

      case sort: StreamPhysicalSort =>
        // Sort supports consuming all changes
        val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
        // Sort will buffer all inputs, and produce insert-only messages when input is finished
        createNewNode(sort, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)

      case cep: StreamPhysicalMatch =>
        // CEP only supports consuming insert-only and producing insert-only changes
        // give a better requester name for exception message
        val children = visitChildren(cep, ModifyKindSetTrait.INSERT_ONLY, "Match Recognize")
        createNewNode(cep, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)

      case over: StreamPhysicalOverAggregate =>
        // OverAggregate can only support insert for row-time/proc-time sort keys
        var overRequiredTrait = ModifyKindSetTrait.INSERT_ONLY
        val builder = ModifyKindSet
          .newBuilder()
          .addContainedKind(ModifyKind.INSERT)
        val groups = over.logicWindow.groups

        if (!groups.isEmpty && !groups.get(0).orderKeys.getFieldCollations.isEmpty) {
          // All aggregates are computed over the same window and order by is supported for only 1 field
          val orderKeyIndex = groups.get(0).orderKeys.getFieldCollations.get(0).getFieldIndex
          val orderKeyType = over.logicWindow.getRowType.getFieldList.get(orderKeyIndex).getType
          if (
            !FlinkTypeFactory.isRowtimeIndicatorType(orderKeyType)
            && !FlinkTypeFactory.isProctimeIndicatorType(orderKeyType)
          ) {
            // Only non row-time/proc-time sort can support UPDATES
            builder.addContainedKind(ModifyKind.UPDATE)
            builder.addContainedKind(ModifyKind.DELETE)
            overRequiredTrait = ModifyKindSetTrait.ALL_CHANGES
          }
        }
        val children = visitChildren(over, overRequiredTrait)
        val providedTrait = new ModifyKindSetTrait(builder.build())
        createNewNode(over, children, providedTrait, requiredTrait, requester)

      case _: StreamPhysicalTemporalSort | _: StreamPhysicalIntervalJoin |
          _: StreamPhysicalPythonOverAggregate =>
        // TemporalSort, IntervalJoin only support consuming insert-only
        // and producing insert-only changes
        val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
        createNewNode(rel, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)

      case join: StreamPhysicalJoin =>
        // join support all changes in input
        val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
        val leftKindSet = getModifyKindSet(children.head)
        val rightKindSet = getModifyKindSet(children.last)
        val innerOrSemi = join.joinSpec.getJoinType == FlinkJoinType.INNER ||
          join.joinSpec.getJoinType == FlinkJoinType.SEMI
        val providedTrait = if (innerOrSemi) {
          // forward left and right modify operations
          new ModifyKindSetTrait(leftKindSet.union(rightKindSet))
        } else {
          // otherwise, it may produce any kinds of changes
          ModifyKindSetTrait.ALL_CHANGES
        }
        createNewNode(join, children, providedTrait, requiredTrait, requester)

      case windowJoin: StreamPhysicalWindowJoin =>
        // Currently, window join only supports INSERT_ONLY in input
        val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
        createNewNode(
          windowJoin,
          children,
          ModifyKindSetTrait.INSERT_ONLY,
          requiredTrait,
          requester)

      case temporalJoin: StreamPhysicalTemporalJoin =>
        // currently, temporal join supports all kings of changes, including right side
        val children = visitChildren(temporalJoin, ModifyKindSetTrait.ALL_CHANGES)
        // forward left input changes
        val leftTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
        createNewNode(temporalJoin, children, leftTrait, requiredTrait, requester)

      case _: StreamPhysicalCalcBase | _: StreamPhysicalCorrelateBase |
          _: StreamPhysicalLookupJoin | _: StreamPhysicalExchange | _: StreamPhysicalExpand |
          _: StreamPhysicalMiniBatchAssigner | _: StreamPhysicalWatermarkAssigner |
          _: StreamPhysicalWindowTableFunction =>
        // transparent forward requiredTrait to children
        val children = visitChildren(rel, requiredTrait, requester)
        val childrenTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
        // forward children mode
        createNewNode(rel, children, childrenTrait, requiredTrait, requester)

      case union: StreamPhysicalUnion =>
        // transparent forward requiredTrait to children
        val children = visitChildren(rel, requiredTrait, requester)
        // union provides all possible kinds of children have
        val providedKindSet = ModifyKindSet.union(children.map(getModifyKindSet): _*)
        createNewNode(
          union,
          children,
          new ModifyKindSetTrait(providedKindSet),
          requiredTrait,
          requester)

      case normalize: StreamPhysicalChangelogNormalize =>
        // changelog normalize support update&delete input
        val children = visitChildren(normalize, ModifyKindSetTrait.ALL_CHANGES)
        // changelog normalize will output all changes
        val providedTrait = ModifyKindSetTrait.ALL_CHANGES
        createNewNode(normalize, children, providedTrait, requiredTrait, requester)

      case ts: StreamPhysicalTableSourceScan =>
        // ScanTableSource supports produces updates and deletions
        val providedTrait = ModifyKindSetTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
        createNewNode(ts, List(), providedTrait, requiredTrait, requester)

      case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan |
          _: StreamPhysicalValues =>
        // DataStream, TableSource and Values only support producing insert-only messages
        createNewNode(rel, List(), ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)

      case scan: StreamPhysicalIntermediateTableScan =>
        val providedTrait = new ModifyKindSetTrait(scan.intermediateTable.modifyKindSet)
        createNewNode(scan, List(), providedTrait, requiredTrait, requester)

      case process: StreamPhysicalProcessTableFunction =>
        // Accepted changes depend on table argument declaration
        val requiredChildrenTraits = StreamPhysicalProcessTableFunction
          .getProvidedInputArgs(process.getCall)
          .map(arg => arg.e)
          .map(
            arg =>
              if (arg.is(StaticArgumentTrait.SUPPORT_UPDATES)) {
                ModifyKindSetTrait.ALL_CHANGES
              } else {
                ModifyKindSetTrait.INSERT_ONLY
              })
          .toList
        val children = if (requiredChildrenTraits.isEmpty) {
          // Constant function has a single StreamPhysicalValues input
          visitChildren(process, ModifyKindSetTrait.INSERT_ONLY)
        } else {
          visitChildren(process, requiredChildrenTraits)
        }
        // Query PTF for updating vs. non-updating
        val providedModifyTrait = queryPtfChangelogMode(
          process,
          children,
          requiredTrait.modifyKindSet.toChangelogModeBuilder.build(),
          ModifyKindSetTrait.fromChangelogMode,
          ModifyKindSetTrait.INSERT_ONLY)
        createNewNode(process, children, providedModifyTrait, requiredTrait, requester)

      case _ =>
        throw new UnsupportedOperationException(
          s"Unsupported visit for ${rel.getClass.getSimpleName}")
    }