in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala [1065:1289]
def visit(rel: StreamPhysicalRel, requiredTrait: DeleteKindTrait): Option[StreamPhysicalRel] =
rel match {
case sink: StreamPhysicalSink =>
val sinkRequiredTraits = inferSinkRequiredTraits(sink)
visitSink(sink, sinkRequiredTraits)
case sink: StreamPhysicalLegacySink[_] =>
val childModifyKindSet = getModifyKindSet(sink.getInput)
val fullDelete = fullDeleteOrNone(childModifyKindSet)
visitSink(sink, Seq(fullDelete))
case _: StreamPhysicalGroupAggregate | _: StreamPhysicalGroupTableAggregate |
_: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
_: StreamPhysicalPythonGroupTableAggregate | _: StreamPhysicalGroupWindowAggregateBase |
_: StreamPhysicalWindowAggregate | _: StreamPhysicalSort | _: StreamPhysicalRank |
_: StreamPhysicalSortLimit | _: StreamPhysicalTemporalJoin |
_: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin |
_: StreamPhysicalWatermarkAssigner | _: StreamPhysicalWindowTableFunction |
_: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
_: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
_: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin =>
// if not explicitly supported, all operators require full deletes if there are updates
val children = rel.getInputs.map {
case child: StreamPhysicalRel =>
val childModifyKindSet = getModifyKindSet(child)
this.visit(child, fullDeleteOrNone(childModifyKindSet))
}.toList
createNewNode(rel, Some(children.flatten), fullDeleteOrNone(getModifyKindSet(rel)))
case process: StreamPhysicalProcessTableFunction =>
// Required delete traits depend on the table argument declaration,
// input traits, partition keys, and upsert keys
val call = process.getCall
val inputArgs = StreamPhysicalProcessTableFunction
.getProvidedInputArgs(call)
val children = process.getInputs
.map(_.asInstanceOf[StreamPhysicalRel])
.zipWithIndex
.map {
case (child, inputIndex) =>
// For PTF without table arguments (i.e. values child)
if (inputArgs.isEmpty) {
this.visit(child, DeleteKindTrait.NONE)
}
// Derive the required delete trait for table arguments
else {
val inputArg = inputArgs.get(inputIndex)
val (tableArg, tableArgCall, modifyKindSet) =
extractPtfTableArgComponents(process, child, inputArg)
if (
tableArg.is(StaticArgumentTrait.SUPPORT_UPDATES)
&& isPtfUpsert(tableArg, tableArgCall, child)
&& !tableArg.is(StaticArgumentTrait.REQUIRE_FULL_DELETE)
) {
this
.visit(child, deleteOnKeyOrNone(modifyKindSet))
.orElse(this.visit(child, fullDeleteOrNone(modifyKindSet)))
} else {
this.visit(child, fullDeleteOrNone(modifyKindSet))
}
}
}
.toList
.flatten
val modifyTrait = getModifyKindSet(rel)
// Query the PTF for full vs. partial deletes
val providedDeleteTrait = queryPtfChangelogMode(
process,
children,
toChangelogMode(process, None, Some(requiredTrait)),
mode =>
if (mode.keyOnlyDeletes()) {
deleteOnKeyOrNone(modifyTrait)
} else {
fullDeleteOrNone(modifyTrait)
},
fullDeleteOrNone(modifyTrait)
)
createNewNode(process, Some(children), providedDeleteTrait)
case join: StreamPhysicalJoin =>
val children = join.getInputs.zipWithIndex.map {
case (child, childOrdinal) =>
val physicalChild = child.asInstanceOf[StreamPhysicalRel]
val supportsDeleteByKey = join.inputUniqueKeyContainsJoinKey(childOrdinal)
val inputModifyKindSet = getModifyKindSet(physicalChild)
if (supportsDeleteByKey && requiredTrait == DELETE_BY_KEY) {
this
.visit(physicalChild, deleteOnKeyOrNone(inputModifyKindSet))
.orElse(this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet)))
} else {
this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet))
}
}
if (children.exists(_.isEmpty)) {
None
} else {
val childRels = children.flatten.toList
if (childRels.exists(r => getDeleteKind(r) == DeleteKind.DELETE_BY_KEY)) {
createNewNode(join, Some(childRels), deleteOnKeyOrNone(getModifyKindSet(rel)))
} else {
createNewNode(join, Some(childRels), fullDeleteOrNone(getModifyKindSet(rel)))
}
}
// if the condition is applied on the upsert key, we can emit whatever the requiredTrait
// is, because we will filter all records based on the condition that applies to that key
case calc: StreamPhysicalCalcBase =>
if (
requiredTrait == DeleteKindTrait.DELETE_BY_KEY &&
isNonUpsertKeyCondition(calc)
) {
None
} else {
// otherwise, forward DeleteKind requirement
visitChildren(rel, requiredTrait) match {
case None => None
case Some(children) =>
val childTrait = children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
createNewNode(rel, Some(children), childTrait)
}
}
case _: StreamPhysicalExchange | _: StreamPhysicalExpand |
_: StreamPhysicalMiniBatchAssigner | _: StreamPhysicalDropUpdateBefore =>
// transparent forward requiredTrait to children
visitChildren(rel, requiredTrait) match {
case None => None
case Some(children) =>
val childTrait = children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
createNewNode(rel, Some(children), childTrait)
}
case union: StreamPhysicalUnion =>
val children = union.getInputs.map {
case child: StreamPhysicalRel =>
val childModifyKindSet = getModifyKindSet(child)
val requiredChildTrait = if (!childModifyKindSet.contains(ModifyKind.DELETE)) {
DeleteKindTrait.NONE
} else {
requiredTrait
}
this.visit(child, requiredChildTrait)
}.toList
if (children.exists(_.isEmpty)) {
None
} else {
val deleteKinds = children.flatten
.map(_.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE))
// union can just forward changes, can't actively satisfy to another changelog mode
val providedTrait = if (deleteKinds.forall(k => DeleteKindTrait.NONE == k)) {
// if all the children is NONE, union is NONE
DeleteKindTrait.NONE
} else {
// otherwise, merge update kinds.
val merged = deleteKinds
.map(_.deleteKind)
.reduce {
(l, r) =>
(l, r) match {
case (DeleteKind.NONE, r: DeleteKind) => r
case (l: DeleteKind, DeleteKind.NONE) => l
case (l: DeleteKind, r: DeleteKind) =>
if (l == r) {
l
} else {
// if any of the union input produces DELETE_BY_KEY, the union produces
// delete by key
DeleteKind.DELETE_BY_KEY
}
}
}
new DeleteKindTrait(merged)
}
createNewNode(union, Some(children.flatten), providedTrait)
}
case normalize: StreamPhysicalChangelogNormalize =>
// if
// 1. we don't need to produce UPDATE_BEFORE,
// 2. children can satisfy the required delete trait,
// 3. the normalize doesn't have filter condition which we'd lose,
// 4. we don't use metadata columns
// we can skip ChangelogNormalize
if (!ChangelogNormalizeRequirementResolver.isRequired(normalize)) {
visitChildren(normalize, requiredTrait) match {
case Some(children) =>
val input = children.head match {
case exchange: StreamPhysicalExchange =>
exchange.getInput
case _ =>
normalize.getInput
}
return Some(input.asInstanceOf[StreamPhysicalRel])
case _ =>
}
}
val childModifyKindTrait = getModifyKindSet(rel.getInput(0))
// prefer delete by key, but accept both
val children = visitChildren(normalize, deleteOnKeyOrNone(childModifyKindTrait))
.orElse(visitChildren(normalize, fullDeleteOrNone(childModifyKindTrait)))
// changelog normalize produces full deletes
createNewNode(rel, children, fullDeleteOrNone(getModifyKindSet(rel)))
case ts: StreamPhysicalTableSourceScan =>
// currently only support BEFORE_AND_AFTER if source produces updates
val providedTrait = DeleteKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
createNewNode(rel, Some(List()), providedTrait)
case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan |
_: StreamPhysicalValues =>
createNewNode(rel, Some(List()), DeleteKindTrait.NONE)
case _: StreamPhysicalIntermediateTableScan =>
createNewNode(rel, Some(List()), fullDeleteOrNone(getModifyKindSet(rel)))
case _ =>
throw new UnsupportedOperationException(
s"Unsupported visit for ${rel.getClass.getSimpleName}")
}