in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala [152:450]
def visit(
rel: StreamPhysicalRel,
requiredTrait: ModifyKindSetTrait,
requester: String): StreamPhysicalRel = rel match {
case sink: StreamPhysicalSink =>
val name = s"Table sink '${sink.contextResolvedTable.getIdentifier.asSummaryString()}'"
val queryModifyKindSet = deriveQueryDefaultChangelogMode(sink.getInput, name)
val sinkRequiredTrait =
ModifyKindSetTrait.fromChangelogMode(sink.tableSink.getChangelogMode(queryModifyKindSet))
val children = visitChildren(sink, sinkRequiredTrait, name)
val sinkTrait = sink.getTraitSet.plus(ModifyKindSetTrait.EMPTY)
// ignore required trait from context, because sink is the true root
sink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel]
case sink: StreamPhysicalLegacySink[_] =>
val (sinkRequiredTrait, name) = sink.sink match {
case _: UpsertStreamTableSink[_] =>
(ModifyKindSetTrait.ALL_CHANGES, "UpsertStreamTableSink")
case _: RetractStreamTableSink[_] =>
(ModifyKindSetTrait.ALL_CHANGES, "RetractStreamTableSink")
case _: AppendStreamTableSink[_] =>
(ModifyKindSetTrait.INSERT_ONLY, "AppendStreamTableSink")
case _: StreamTableSink[_] =>
(ModifyKindSetTrait.INSERT_ONLY, "StreamTableSink")
case ds: DataStreamTableSink[_] =>
if (ds.withChangeFlag) {
(ModifyKindSetTrait.ALL_CHANGES, "toRetractStream")
} else {
(ModifyKindSetTrait.INSERT_ONLY, "toAppendStream")
}
case _ =>
throw new UnsupportedOperationException(
s"Unsupported sink '${sink.sink.getClass.getSimpleName}'")
}
val children = visitChildren(sink, sinkRequiredTrait, name)
val sinkTrait = sink.getTraitSet.plus(ModifyKindSetTrait.EMPTY)
// ignore required trait from context, because sink is the true root
sink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel]
case agg: StreamPhysicalGroupAggregate =>
// agg support all changes in input
val children = visitChildren(agg, ModifyKindSetTrait.ALL_CHANGES)
val inputModifyKindSet = getModifyKindSet(children.head)
val builder = ModifyKindSet
.newBuilder()
.addContainedKind(ModifyKind.INSERT)
.addContainedKind(ModifyKind.UPDATE)
if (
inputModifyKindSet.contains(ModifyKind.UPDATE) ||
inputModifyKindSet.contains(ModifyKind.DELETE)
) {
builder.addContainedKind(ModifyKind.DELETE)
}
val providedTrait = new ModifyKindSetTrait(builder.build())
createNewNode(agg, children, providedTrait, requiredTrait, requester)
case tagg: StreamPhysicalGroupTableAggregateBase =>
// table agg support all changes in input
val children = visitChildren(tagg, ModifyKindSetTrait.ALL_CHANGES)
// table aggregate will produce all changes, including deletions
createNewNode(tagg, children, ModifyKindSetTrait.ALL_CHANGES, requiredTrait, requester)
case agg: StreamPhysicalPythonGroupAggregate =>
// agg support all changes in input
val children = visitChildren(agg, ModifyKindSetTrait.ALL_CHANGES)
val inputModifyKindSet = getModifyKindSet(children.head)
val builder = ModifyKindSet
.newBuilder()
.addContainedKind(ModifyKind.INSERT)
.addContainedKind(ModifyKind.UPDATE)
if (
inputModifyKindSet.contains(ModifyKind.UPDATE) ||
inputModifyKindSet.contains(ModifyKind.DELETE)
) {
builder.addContainedKind(ModifyKind.DELETE)
}
val providedTrait = new ModifyKindSetTrait(builder.build())
createNewNode(agg, children, providedTrait, requiredTrait, requester)
case window: StreamPhysicalGroupWindowAggregateBase =>
// WindowAggregate and WindowTableAggregate support all changes in input
val children = visitChildren(window, ModifyKindSetTrait.ALL_CHANGES)
val builder = ModifyKindSet
.newBuilder()
.addContainedKind(ModifyKind.INSERT)
if (window.emitStrategy.produceUpdates) {
builder.addContainedKind(ModifyKind.UPDATE)
}
val providedTrait = new ModifyKindSetTrait(builder.build())
createNewNode(window, children, providedTrait, requiredTrait, requester)
case window: StreamPhysicalWindowAggregate =>
// WindowAggregate and WindowTableAggregate support all changes in input
val children = visitChildren(window, ModifyKindSetTrait.ALL_CHANGES)
// TODO support early / late fire and then this node may produce update records
val providedTrait = new ModifyKindSetTrait(ModifyKindSet.INSERT_ONLY)
createNewNode(window, children, providedTrait, requiredTrait, requester)
case _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate =>
// WindowAggregate, WindowRank, WindowDeduplicate support insert-only in input
val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
val providedTrait = ModifyKindSetTrait.INSERT_ONLY
createNewNode(rel, children, providedTrait, requiredTrait, requester)
case rank: StreamPhysicalRank if RankUtil.isDeduplication(rank) =>
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
val tableConfig = unwrapTableConfig(rank)
// if the rank is deduplication and can be executed as insert-only, forward that information
val insertOnly = children.forall(ChangelogPlanUtils.isInsertOnly)
val providedTrait = {
if (
insertOnly && RankUtil.outputInsertOnlyInDeduplicate(
tableConfig,
RankUtil.keepLastDeduplicateRow(rank.orderKey))
) {
// Deduplicate outputs append only if first row is kept and mini batching is disabled
ModifyKindSetTrait.INSERT_ONLY
} else {
ModifyKindSetTrait.ALL_CHANGES
}
}
createNewNode(rel, children, providedTrait, requiredTrait, requester)
case rank: StreamPhysicalRank if !RankUtil.isDeduplication(rank) =>
// Rank supports consuming all changes
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
createNewNode(rel, children, ModifyKindSetTrait.ALL_CHANGES, requiredTrait, requester)
case limit: StreamPhysicalLimit =>
// limit support all changes in input
val children = visitChildren(limit, ModifyKindSetTrait.ALL_CHANGES)
val providedTrait = if (getModifyKindSet(children.head).isInsertOnly) {
ModifyKindSetTrait.INSERT_ONLY
} else {
ModifyKindSetTrait.ALL_CHANGES
}
createNewNode(limit, children, providedTrait, requiredTrait, requester)
case _: StreamPhysicalSortLimit =>
// SortLimit supports consuming all changes
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
createNewNode(rel, children, ModifyKindSetTrait.ALL_CHANGES, requiredTrait, requester)
case sort: StreamPhysicalSort =>
// Sort supports consuming all changes
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
// Sort will buffer all inputs, and produce insert-only messages when input is finished
createNewNode(sort, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)
case cep: StreamPhysicalMatch =>
// CEP only supports consuming insert-only and producing insert-only changes
// give a better requester name for exception message
val children = visitChildren(cep, ModifyKindSetTrait.INSERT_ONLY, "Match Recognize")
createNewNode(cep, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)
case over: StreamPhysicalOverAggregate =>
// OverAggregate can only support insert for row-time/proc-time sort keys
var overRequiredTrait = ModifyKindSetTrait.INSERT_ONLY
val builder = ModifyKindSet
.newBuilder()
.addContainedKind(ModifyKind.INSERT)
val groups = over.logicWindow.groups
if (!groups.isEmpty && !groups.get(0).orderKeys.getFieldCollations.isEmpty) {
// All aggregates are computed over the same window and order by is supported for only 1 field
val orderKeyIndex = groups.get(0).orderKeys.getFieldCollations.get(0).getFieldIndex
val orderKeyType = over.logicWindow.getRowType.getFieldList.get(orderKeyIndex).getType
if (
!FlinkTypeFactory.isRowtimeIndicatorType(orderKeyType)
&& !FlinkTypeFactory.isProctimeIndicatorType(orderKeyType)
) {
// Only non row-time/proc-time sort can support UPDATES
builder.addContainedKind(ModifyKind.UPDATE)
builder.addContainedKind(ModifyKind.DELETE)
overRequiredTrait = ModifyKindSetTrait.ALL_CHANGES
}
}
val children = visitChildren(over, overRequiredTrait)
val providedTrait = new ModifyKindSetTrait(builder.build())
createNewNode(over, children, providedTrait, requiredTrait, requester)
case _: StreamPhysicalTemporalSort | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalPythonOverAggregate =>
// TemporalSort, IntervalJoin only support consuming insert-only
// and producing insert-only changes
val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
createNewNode(rel, children, ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)
case join: StreamPhysicalJoin =>
// join support all changes in input
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES)
val leftKindSet = getModifyKindSet(children.head)
val rightKindSet = getModifyKindSet(children.last)
val innerOrSemi = join.joinSpec.getJoinType == FlinkJoinType.INNER ||
join.joinSpec.getJoinType == FlinkJoinType.SEMI
val providedTrait = if (innerOrSemi) {
// forward left and right modify operations
new ModifyKindSetTrait(leftKindSet.union(rightKindSet))
} else {
// otherwise, it may produce any kinds of changes
ModifyKindSetTrait.ALL_CHANGES
}
createNewNode(join, children, providedTrait, requiredTrait, requester)
case windowJoin: StreamPhysicalWindowJoin =>
// Currently, window join only supports INSERT_ONLY in input
val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
createNewNode(
windowJoin,
children,
ModifyKindSetTrait.INSERT_ONLY,
requiredTrait,
requester)
case temporalJoin: StreamPhysicalTemporalJoin =>
// currently, temporal join supports all kings of changes, including right side
val children = visitChildren(temporalJoin, ModifyKindSetTrait.ALL_CHANGES)
// forward left input changes
val leftTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
createNewNode(temporalJoin, children, leftTrait, requiredTrait, requester)
case _: StreamPhysicalCalcBase | _: StreamPhysicalCorrelateBase |
_: StreamPhysicalLookupJoin | _: StreamPhysicalExchange | _: StreamPhysicalExpand |
_: StreamPhysicalMiniBatchAssigner | _: StreamPhysicalWatermarkAssigner |
_: StreamPhysicalWindowTableFunction =>
// transparent forward requiredTrait to children
val children = visitChildren(rel, requiredTrait, requester)
val childrenTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
// forward children mode
createNewNode(rel, children, childrenTrait, requiredTrait, requester)
case union: StreamPhysicalUnion =>
// transparent forward requiredTrait to children
val children = visitChildren(rel, requiredTrait, requester)
// union provides all possible kinds of children have
val providedKindSet = ModifyKindSet.union(children.map(getModifyKindSet): _*)
createNewNode(
union,
children,
new ModifyKindSetTrait(providedKindSet),
requiredTrait,
requester)
case normalize: StreamPhysicalChangelogNormalize =>
// changelog normalize support update&delete input
val children = visitChildren(normalize, ModifyKindSetTrait.ALL_CHANGES)
// changelog normalize will output all changes
val providedTrait = ModifyKindSetTrait.ALL_CHANGES
createNewNode(normalize, children, providedTrait, requiredTrait, requester)
case ts: StreamPhysicalTableSourceScan =>
// ScanTableSource supports produces updates and deletions
val providedTrait = ModifyKindSetTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
createNewNode(ts, List(), providedTrait, requiredTrait, requester)
case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan |
_: StreamPhysicalValues =>
// DataStream, TableSource and Values only support producing insert-only messages
createNewNode(rel, List(), ModifyKindSetTrait.INSERT_ONLY, requiredTrait, requester)
case scan: StreamPhysicalIntermediateTableScan =>
val providedTrait = new ModifyKindSetTrait(scan.intermediateTable.modifyKindSet)
createNewNode(scan, List(), providedTrait, requiredTrait, requester)
case process: StreamPhysicalProcessTableFunction =>
// Accepted changes depend on table argument declaration
val requiredChildrenTraits = StreamPhysicalProcessTableFunction
.getProvidedInputArgs(process.getCall)
.map(arg => arg.e)
.map(
arg =>
if (arg.is(StaticArgumentTrait.SUPPORT_UPDATES)) {
ModifyKindSetTrait.ALL_CHANGES
} else {
ModifyKindSetTrait.INSERT_ONLY
})
.toList
val children = if (requiredChildrenTraits.isEmpty) {
// Constant function has a single StreamPhysicalValues input
visitChildren(process, ModifyKindSetTrait.INSERT_ONLY)
} else {
visitChildren(process, requiredChildrenTraits)
}
// Query PTF for updating vs. non-updating
val providedModifyTrait = queryPtfChangelogMode(
process,
children,
requiredTrait.modifyKindSet.toChangelogModeBuilder.build(),
ModifyKindSetTrait.fromChangelogMode,
ModifyKindSetTrait.INSERT_ONLY)
createNewNode(process, children, providedModifyTrait, requiredTrait, requester)
case _ =>
throw new UnsupportedOperationException(
s"Unsupported visit for ${rel.getClass.getSimpleName}")
}