def visit()

in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala [1065:1289]


    def visit(rel: StreamPhysicalRel, requiredTrait: DeleteKindTrait): Option[StreamPhysicalRel] =
      rel match {
        case sink: StreamPhysicalSink =>
          val sinkRequiredTraits = inferSinkRequiredTraits(sink)
          visitSink(sink, sinkRequiredTraits)

        case sink: StreamPhysicalLegacySink[_] =>
          val childModifyKindSet = getModifyKindSet(sink.getInput)
          val fullDelete = fullDeleteOrNone(childModifyKindSet)
          visitSink(sink, Seq(fullDelete))

        case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate |
            _: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
            _: StreamPhysicalPythonGroupTableAggregate | _: StreamPhysicalGroupWindowAggregateBase |
            _: StreamPhysicalWindowAggregate | _: StreamPhysicalSort | _: StreamPhysicalRank |
            _: StreamPhysicalSortLimit | _: StreamPhysicalTemporalJoin |
            _: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin |
            _: StreamPhysicalWatermarkAssigner | _: StreamPhysicalWindowTableFunction |
            _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
            _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
            _: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
            _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin =>
          // if not explicitly supported, all operators require full deletes if there are updates
          val children = rel.getInputs.map {
            case child: StreamPhysicalRel =>
              val childModifyKindSet = getModifyKindSet(child)
              this.visit(child, fullDeleteOrNone(childModifyKindSet))
          }.toList
          createNewNode(rel, Some(children.flatten), fullDeleteOrNone(getModifyKindSet(rel)))

        case process: StreamPhysicalProcessTableFunction =>
          // Required delete traits depend on the table argument declaration,
          // input traits, partition keys, and upsert keys
          val call = process.getCall
          val inputArgs = StreamPhysicalProcessTableFunction
            .getProvidedInputArgs(call)
          val children = process.getInputs
            .map(_.asInstanceOf[StreamPhysicalRel])
            .zipWithIndex
            .map {
              case (child, inputIndex) =>
                // For PTF without table arguments (i.e. values child)
                if (inputArgs.isEmpty) {
                  this.visit(child, DeleteKindTrait.NONE)
                }
                // Derive the required delete trait for table arguments
                else {
                  val inputArg = inputArgs.get(inputIndex)
                  val (tableArg, tableArgCall, modifyKindSet) =
                    extractPtfTableArgComponents(process, child, inputArg)
                  if (
                    tableArg.is(StaticArgumentTrait.SUPPORT_UPDATES)
                    && isPtfUpsert(tableArg, tableArgCall, child)
                    && !tableArg.is(StaticArgumentTrait.REQUIRE_FULL_DELETE)
                  ) {
                    this
                      .visit(child, deleteOnKeyOrNone(modifyKindSet))
                      .orElse(this.visit(child, fullDeleteOrNone(modifyKindSet)))
                  } else {
                    this.visit(child, fullDeleteOrNone(modifyKindSet))
                  }
                }
            }
            .toList
            .flatten
          val modifyTrait = getModifyKindSet(rel)
          // Query the PTF for full vs. partial deletes
          val providedDeleteTrait = queryPtfChangelogMode(
            process,
            children,
            toChangelogMode(process, None, Some(requiredTrait)),
            mode =>
              if (mode.keyOnlyDeletes()) {
                deleteOnKeyOrNone(modifyTrait)
              } else {
                fullDeleteOrNone(modifyTrait)
              },
            fullDeleteOrNone(modifyTrait)
          )
          createNewNode(process, Some(children), providedDeleteTrait)

        case join: StreamPhysicalJoin =>
          val children = join.getInputs.zipWithIndex.map {
            case (child, childOrdinal) =>
              val physicalChild = child.asInstanceOf[StreamPhysicalRel]
              val supportsDeleteByKey = join.inputUniqueKeyContainsJoinKey(childOrdinal)
              val inputModifyKindSet = getModifyKindSet(physicalChild)
              if (supportsDeleteByKey && requiredTrait == DELETE_BY_KEY) {
                this
                  .visit(physicalChild, deleteOnKeyOrNone(inputModifyKindSet))
                  .orElse(this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet)))
              } else {
                this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet))
              }
          }
          if (children.exists(_.isEmpty)) {
            None
          } else {
            val childRels = children.flatten.toList
            if (childRels.exists(r => getDeleteKind(r) == DeleteKind.DELETE_BY_KEY)) {
              createNewNode(join, Some(childRels), deleteOnKeyOrNone(getModifyKindSet(rel)))
            } else {
              createNewNode(join, Some(childRels), fullDeleteOrNone(getModifyKindSet(rel)))
            }
          }

        // if the condition is applied on the upsert key, we can emit whatever the requiredTrait
        // is, because we will filter all records based on the condition that applies to that key
        case calc: StreamPhysicalCalcBase =>
          if (
            requiredTrait == DeleteKindTrait.DELETE_BY_KEY &&
            isNonUpsertKeyCondition(calc)
          ) {
            None
          } else {
            // otherwise, forward DeleteKind requirement
            visitChildren(rel, requiredTrait) match {
              case None => None
              case Some(children) =>
                val childTrait = children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
                createNewNode(rel, Some(children), childTrait)
            }
          }

        case _: StreamPhysicalExchange | _: StreamPhysicalExpand |
            _: StreamPhysicalMiniBatchAssigner | _: StreamPhysicalDropUpdateBefore =>
          // transparent forward requiredTrait to children
          visitChildren(rel, requiredTrait) match {
            case None => None
            case Some(children) =>
              val childTrait = children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
              createNewNode(rel, Some(children), childTrait)
          }

        case union: StreamPhysicalUnion =>
          val children = union.getInputs.map {
            case child: StreamPhysicalRel =>
              val childModifyKindSet = getModifyKindSet(child)
              val requiredChildTrait = if (!childModifyKindSet.contains(ModifyKind.DELETE)) {
                DeleteKindTrait.NONE
              } else {
                requiredTrait
              }
              this.visit(child, requiredChildTrait)
          }.toList

          if (children.exists(_.isEmpty)) {
            None
          } else {
            val deleteKinds = children.flatten
              .map(_.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE))
            // union can just forward changes, can't actively satisfy to another changelog mode
            val providedTrait = if (deleteKinds.forall(k => DeleteKindTrait.NONE == k)) {
              // if all the children is NONE, union is NONE
              DeleteKindTrait.NONE
            } else {
              // otherwise, merge update kinds.
              val merged = deleteKinds
                .map(_.deleteKind)
                .reduce {
                  (l, r) =>
                    (l, r) match {
                      case (DeleteKind.NONE, r: DeleteKind) => r
                      case (l: DeleteKind, DeleteKind.NONE) => l
                      case (l: DeleteKind, r: DeleteKind) =>
                        if (l == r) {
                          l
                        } else {
                          // if any of the union input produces DELETE_BY_KEY, the union produces
                          // delete by key
                          DeleteKind.DELETE_BY_KEY
                        }
                    }
                }
              new DeleteKindTrait(merged)
            }
            createNewNode(union, Some(children.flatten), providedTrait)
          }

        case normalize: StreamPhysicalChangelogNormalize =>
          // if
          // 1. we don't need to produce UPDATE_BEFORE,
          // 2. children can satisfy the required delete trait,
          // 3. the normalize doesn't have filter condition which we'd lose,
          // 4. we don't use metadata columns
          // we can skip ChangelogNormalize
          if (!ChangelogNormalizeRequirementResolver.isRequired(normalize)) {
            visitChildren(normalize, requiredTrait) match {
              case Some(children) =>
                val input = children.head match {
                  case exchange: StreamPhysicalExchange =>
                    exchange.getInput
                  case _ =>
                    normalize.getInput
                }
                return Some(input.asInstanceOf[StreamPhysicalRel])
              case _ =>
            }
          }
          val childModifyKindTrait = getModifyKindSet(rel.getInput(0))

          // prefer delete by key, but accept both
          val children = visitChildren(normalize, deleteOnKeyOrNone(childModifyKindTrait))
            .orElse(visitChildren(normalize, fullDeleteOrNone(childModifyKindTrait)))

          // changelog normalize produces full deletes
          createNewNode(rel, children, fullDeleteOrNone(getModifyKindSet(rel)))

        case ts: StreamPhysicalTableSourceScan =>
          // currently only support BEFORE_AND_AFTER if source produces updates
          val providedTrait = DeleteKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
          createNewNode(rel, Some(List()), providedTrait)

        case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan |
            _: StreamPhysicalValues =>
          createNewNode(rel, Some(List()), DeleteKindTrait.NONE)

        case _: StreamPhysicalIntermediateTableScan =>
          createNewNode(rel, Some(List()), fullDeleteOrNone(getModifyKindSet(rel)))

        case _ =>
          throw new UnsupportedOperationException(
            s"Unsupported visit for ${rel.getClass.getSimpleName}")

      }