in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala [560:858]
def visit(
rel: StreamPhysicalRel,
requiredUpdateTrait: UpdateKindTrait): Option[StreamPhysicalRel] =
rel match {
case sink: StreamPhysicalSink =>
val sinkRequiredTraits = inferSinkRequiredTraits(sink)
val upsertMaterialize = analyzeUpsertMaterializeStrategy(sink)
visitSink(sink.copy(upsertMaterialize), sinkRequiredTraits)
case sink: StreamPhysicalLegacySink[_] =>
val childModifyKindSet = getModifyKindSet(sink.getInput)
val onlyAfter = onlyAfterOrNone(childModifyKindSet)
val beforeAndAfter = beforeAfterOrNone(childModifyKindSet)
val sinkRequiredTraits = sink.sink match {
case _: UpsertStreamTableSink[_] =>
// support both ONLY_AFTER and BEFORE_AFTER, but prefer ONLY_AFTER
Seq(onlyAfter, beforeAndAfter)
case _: RetractStreamTableSink[_] =>
Seq(beforeAndAfter)
case _: AppendStreamTableSink[_] | _: StreamTableSink[_] =>
Seq(UpdateKindTrait.NONE)
case ds: DataStreamTableSink[_] =>
if (ds.withChangeFlag) {
if (ds.needUpdateBefore) {
Seq(beforeAndAfter)
} else {
// support both ONLY_AFTER and BEFORE_AFTER, but prefer ONLY_AFTER
Seq(onlyAfter, beforeAndAfter)
}
} else {
Seq(UpdateKindTrait.NONE)
}
}
visitSink(sink, sinkRequiredTraits)
case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate |
_: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
_: StreamPhysicalPythonGroupTableAggregate | _: StreamPhysicalGroupWindowAggregateBase |
_: StreamPhysicalWindowAggregate | _: StreamPhysicalOverAggregate =>
// Aggregate, TableAggregate, OverAggregate, Limit, GroupWindowAggregate, WindowAggregate,
// and WindowTableAggregate requires update_before if there are updates
val requiredChildUpdateTrait = beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
val children = visitChildren(rel, requiredChildUpdateTrait)
// use requiredTrait as providedTrait, because they should support all kinds of UpdateKind
createNewNode(rel, children, requiredUpdateTrait)
case _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
_: StreamPhysicalTemporalSort | _: StreamPhysicalMatch | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin =>
// WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP,
// and IntervalJoin, WindowJoin require nothing about UpdateKind.
val children = visitChildren(rel, UpdateKindTrait.NONE)
createNewNode(rel, children, requiredUpdateTrait)
case rank: StreamPhysicalRank =>
val rankStrategies =
RankProcessStrategy.analyzeRankProcessStrategies(rank, rank.partitionKey, rank.orderKey)
visitRankStrategies(
rankStrategies,
requiredUpdateTrait,
rankStrategy => rank.copy(rankStrategy))
case sortLimit: StreamPhysicalSortLimit =>
val rankStrategies = RankProcessStrategy.analyzeRankProcessStrategies(
sortLimit,
ImmutableBitSet.of(),
sortLimit.getCollation)
visitRankStrategies(
rankStrategies,
requiredUpdateTrait,
rankStrategy => sortLimit.copy(rankStrategy))
case sort: StreamPhysicalSort =>
val requiredChildTrait = beforeAfterOrNone(getModifyKindSet(sort.getInput))
val children = visitChildren(sort, requiredChildTrait)
createNewNode(sort, children, requiredUpdateTrait)
case join: StreamPhysicalJoin =>
val onlyAfterByParent = requiredUpdateTrait.updateKind == UpdateKind.ONLY_UPDATE_AFTER
val children = join.getInputs.zipWithIndex.map {
case (child, childOrdinal) =>
val physicalChild = child.asInstanceOf[StreamPhysicalRel]
val supportOnlyAfter = join.inputUniqueKeyContainsJoinKey(childOrdinal)
val inputModifyKindSet = getModifyKindSet(physicalChild)
if (onlyAfterByParent) {
if (inputModifyKindSet.contains(ModifyKind.UPDATE) && !supportOnlyAfter) {
// the parent requires only-after, however, the join doesn't support this
None
} else {
this.visit(physicalChild, onlyAfterOrNone(inputModifyKindSet))
}
} else {
this.visit(physicalChild, beforeAfterOrNone(inputModifyKindSet))
}
}
if (children.exists(_.isEmpty)) {
None
} else {
createNewNode(join, Some(children.flatten.toList), requiredUpdateTrait)
}
case temporalJoin: StreamPhysicalTemporalJoin =>
val left = temporalJoin.getLeft.asInstanceOf[StreamPhysicalRel]
val right = temporalJoin.getRight.asInstanceOf[StreamPhysicalRel]
// the left input required trait depends on it's parent in temporal join
// the left input will send message to parent
val requiredUpdateBeforeByParent =
requiredUpdateTrait.updateKind == UpdateKind.BEFORE_AND_AFTER
val leftInputModifyKindSet = getModifyKindSet(left)
val leftRequiredTrait = if (requiredUpdateBeforeByParent) {
beforeAfterOrNone(leftInputModifyKindSet)
} else {
onlyAfterOrNone(leftInputModifyKindSet)
}
val newLeftOption = this.visit(left, leftRequiredTrait)
val rightInputModifyKindSet = getModifyKindSet(right)
// currently temporal join support changelog stream as the right side
// so it supports both ONLY_AFTER and BEFORE_AFTER, but prefer ONLY_AFTER
val newRightOption = this.visit(right, onlyAfterOrNone(rightInputModifyKindSet)) match {
case Some(newRight) => Some(newRight)
case None =>
val beforeAfter = beforeAfterOrNone(rightInputModifyKindSet)
this.visit(right, beforeAfter)
}
(newLeftOption, newRightOption) match {
case (Some(newLeft), Some(newRight)) =>
val leftTrait = newLeft.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
createNewNode(temporalJoin, Some(List(newLeft, newRight)), leftTrait)
case _ =>
None
}
// if the condition is applied on the upsert key, we can emit whatever the requiredTrait
// is, because we will filter all records based on the condition that applies to that key
case calc: StreamPhysicalCalcBase =>
if (
requiredUpdateTrait == UpdateKindTrait.ONLY_UPDATE_AFTER &&
isNonUpsertKeyCondition(calc)
) {
// we don't expect filter to satisfy ONLY_UPDATE_AFTER update kind,
// to solve the bad case like a single 'cnt < 10' condition after aggregation.
// See FLINK-9528.
None
} else {
// otherwise, forward UpdateKind requirement
visitChildren(rel, requiredUpdateTrait) match {
case None => None
case Some(children) =>
val childTrait = children.head.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
createNewNode(rel, Some(children), childTrait)
}
}
case _: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin |
_: StreamPhysicalExchange | _: StreamPhysicalExpand |
_: StreamPhysicalMiniBatchAssigner | _: StreamPhysicalWatermarkAssigner |
_: StreamPhysicalWindowTableFunction =>
// transparent forward requiredTrait to children
visitChildren(rel, requiredUpdateTrait) match {
case None => None
case Some(children) =>
val childTrait = children.head.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
createNewNode(rel, Some(children), childTrait)
}
case union: StreamPhysicalUnion =>
val children = union.getInputs.map {
case child: StreamPhysicalRel =>
val childModifyKindSet = getModifyKindSet(child)
val requiredChildTrait = if (childModifyKindSet.isInsertOnly) {
UpdateKindTrait.NONE
} else {
requiredUpdateTrait
}
this.visit(child, requiredChildTrait)
}.toList
if (children.exists(_.isEmpty)) {
None
} else {
val updateKinds = children.flatten
.map(_.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE))
// union can just forward changes, can't actively satisfy to another changelog mode
val providedTrait = if (updateKinds.forall(k => UpdateKindTrait.NONE == k)) {
// if all the children is NO_UPDATE, union is NO_UPDATE
UpdateKindTrait.NONE
} else {
// otherwise, merge update kinds.
val merged = updateKinds
.map(_.updateKind)
.reduce {
(l, r) =>
(l, r) match {
case (UpdateKind.NONE, r: UpdateKind) => r
case (l: UpdateKind, UpdateKind.NONE) => l
case (l: UpdateKind, r: UpdateKind) if l == r => l
// UNION doesn't support to union ONLY_UPDATE_AFTER and BEFORE_AND_AFTER inputs
case (_, _) => return None
}
}
new UpdateKindTrait(merged)
}
createNewNode(union, Some(children.flatten), providedTrait)
}
case normalize: StreamPhysicalChangelogNormalize =>
// changelog normalize currently only supports input only sending UPDATE_AFTER
val children = visitChildren(normalize, UpdateKindTrait.ONLY_UPDATE_AFTER)
// use requiredTrait as providedTrait,
// because changelog normalize supports all kinds of UpdateKind
createNewNode(rel, children, requiredUpdateTrait)
case ts: StreamPhysicalTableSourceScan =>
// currently only support BEFORE_AND_AFTER if source produces updates
val providedTrait = UpdateKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
val newSource = createNewNode(rel, Some(List()), providedTrait)
if (
providedTrait.equals(UpdateKindTrait.BEFORE_AND_AFTER) &&
requiredUpdateTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER)
) {
// requiring only-after, but the source is CDC source, then drop update_before manually
val dropUB = new StreamPhysicalDropUpdateBefore(rel.getCluster, rel.getTraitSet, rel)
createNewNode(dropUB, newSource.map(s => List(s)), requiredUpdateTrait)
} else {
newSource
}
case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan |
_: StreamPhysicalValues =>
createNewNode(rel, Some(List()), UpdateKindTrait.NONE)
case scan: StreamPhysicalIntermediateTableScan =>
val providedTrait = if (scan.intermediateTable.isUpdateBeforeRequired) {
// we can't drop UPDATE_BEFORE if it is required by other parent blocks
UpdateKindTrait.BEFORE_AND_AFTER
} else {
requiredUpdateTrait
}
if (!providedTrait.satisfies(requiredUpdateTrait)) {
// require ONLY_AFTER but can only provide BEFORE_AND_AFTER
None
} else {
createNewNode(rel, Some(List()), providedTrait)
}
case process: StreamPhysicalProcessTableFunction =>
// Required update traits depend on the table argument declaration,
// input traits, partition keys, and upsert keys
val inputArgs = StreamPhysicalProcessTableFunction
.getProvidedInputArgs(process.getCall)
val children = process.getInputs
.map(_.asInstanceOf[StreamPhysicalRel])
.zipWithIndex
.map {
case (child, inputIndex) =>
// For PTF without table arguments (i.e. values child)
if (inputArgs.isEmpty) {
this.visit(child, UpdateKindTrait.NONE)
}
// Derive the required update trait for table arguments
else {
val inputArg = inputArgs.get(inputIndex)
val (tableArg, tableArgCall, modifyKindSet) =
extractPtfTableArgComponents(process, child, inputArg)
val requiredUpdateTrait =
if (
!modifyKindSet.isInsertOnly && tableArg.is(
StaticArgumentTrait.SUPPORT_UPDATES)
) {
if (isPtfUpsert(tableArg, tableArgCall, child)) {
UpdateKindTrait.ONLY_UPDATE_AFTER
} else {
UpdateKindTrait.BEFORE_AND_AFTER
}
} else {
UpdateKindTrait.NONE
}
this.visit(child, requiredUpdateTrait)
}
}
.toList
.flatten
// Query PTF for upsert vs. retract
val providedUpdateTrait = queryPtfChangelogMode(
process,
children,
toChangelogMode(process, Some(requiredUpdateTrait), None),
UpdateKindTrait.fromChangelogMode,
UpdateKindTrait.NONE)
createNewNode(rel, Some(children), providedUpdateTrait)
case _ =>
throw new UnsupportedOperationException(
s"Unsupported visit for ${rel.getClass.getSimpleName}")
}