in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala [177:559]
def checkForStreaming(plan: LogicalPlan, outputMode: OutputMode): Unit = {
if (!plan.isStreaming) {
throwError(
"Queries without streaming sources cannot be executed with writeStream.start()")(plan)
}
/** Collect all the streaming aggregates in a sub plan */
def collectStreamingAggregates(subplan: LogicalPlan): Seq[Aggregate] = {
subplan.collect {
case a: Aggregate if a.isStreaming => a
// Since the Distinct node will be replaced to Aggregate in the optimizer rule
// [[ReplaceDistinctWithAggregate]], here we also need to check all Distinct node by
// assuming it as Aggregate.
case d @ Distinct(c: LogicalPlan) if d.isStreaming => Aggregate(c.output, c.output, c)
}
}
val mapGroupsWithStates = plan.collect {
case f: FlatMapGroupsWithState if f.isStreaming && f.isMapGroupsWithState => f
}
// Disallow multiple `mapGroupsWithState`s.
if (mapGroupsWithStates.size >= 2) {
throwError(
"Multiple mapGroupsWithStates are not supported on a streaming DataFrames/Datasets")(plan)
}
val flatMapGroupsWithStates = plan.collect {
case f: FlatMapGroupsWithState if f.isStreaming && !f.isMapGroupsWithState => f
}
// Disallow mixing `mapGroupsWithState`s and `flatMapGroupsWithState`s
if (mapGroupsWithStates.nonEmpty && flatMapGroupsWithStates.nonEmpty) {
throwError(
"Mixing mapGroupsWithStates and flatMapGroupsWithStates are not supported on a " +
"streaming DataFrames/Datasets")(plan)
}
// Only allow multiple `FlatMapGroupsWithState(Append)`s in append mode.
if (flatMapGroupsWithStates.size >= 2 && (
outputMode != InternalOutputModes.Append ||
flatMapGroupsWithStates.exists(_.outputMode != InternalOutputModes.Append)
)) {
throwError(
"Multiple flatMapGroupsWithStates are not supported when they are not all in append mode" +
" or the output mode is not append on a streaming DataFrames/Datasets")(plan)
}
val applyInPandasWithStates = plan.collect {
case f: FlatMapGroupsInPandasWithState if f.isStreaming => f
}
// Disallow multiple `applyInPandasWithState`s.
if (applyInPandasWithStates.size > 1) {
throwError(
"Multiple applyInPandasWithStates are not supported on a streaming " +
"DataFrames/Datasets")(plan)
}
// check to see that if store encoding format is set to true, then we have no stateful
// operators in the query or only variants of operators that support avro encoding such as
// transformWithState.
checkSupportedStoreEncodingFormats(plan)
val aggregates = collectStreamingAggregates(plan)
// Disallow some output mode
outputMode match {
case InternalOutputModes.Append if aggregates.nonEmpty =>
val aggregate = aggregates.head
// Find any attributes that are associated with an eventTime watermark.
val watermarkAttributes = aggregate.groupingExpressions.collect {
case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
}
// We can append rows to the sink once the group is under the watermark. Without this
// watermark a group is never "finished" so we would never output anything.
if (watermarkAttributes.isEmpty) {
throw QueryCompilationErrors.unsupportedOutputModeForStreamingOperationError(
outputMode, "streaming aggregations without watermark")
}
case InternalOutputModes.Update if aggregates.nonEmpty =>
val aggregate = aggregates.head
val existingSessionWindow = aggregate.groupingExpressions.exists {
case attr: AttributeReference
if attr.metadata.contains(SessionWindow.marker) &&
attr.metadata.getBoolean(SessionWindow.marker) => true
case _ => false
}
if (existingSessionWindow) {
throw QueryCompilationErrors.unsupportedOutputModeForStreamingOperationError(
outputMode, "session window streaming aggregations")
}
case InternalOutputModes.Complete if aggregates.isEmpty =>
throw QueryCompilationErrors.unsupportedOutputModeForStreamingOperationError(
outputMode, "no streaming aggregations")
case _ =>
}
/**
* Whether the subplan will contain complete data or incremental data in every incremental
* execution. Some operations may be allowed only when the child logical plan gives complete
* data.
*/
def containsCompleteData(subplan: LogicalPlan): Boolean = {
val aggs = subplan.collect { case a@Aggregate(_, _, _, _) if a.isStreaming => a }
// Either the subplan has no streaming source, or it has aggregation with Complete mode
!subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete)
}
def checkUnsupportedExpressions(implicit operator: LogicalPlan): Unit = {
val unsupportedExprs = operator.expressions.flatMap(_.collect {
case m: MonotonicallyIncreasingID => m
}).distinct
if (unsupportedExprs.nonEmpty) {
throwError("Expression(s): " + unsupportedExprs.map(_.sql).mkString(", ") +
" is not supported with streaming DataFrames/Datasets")
}
}
plan.foreachUp { implicit subPlan =>
// Operations that cannot exists anywhere in a streaming plan
subPlan match {
case Aggregate(groupingExpressions, aggregateExpressions, child, _) =>
val distinctAggExprs = aggregateExpressions.flatMap { expr =>
expr.collect { case ae: AggregateExpression if ae.isDistinct => ae }
}
val haveGroupingSets = groupingExpressions.exists(_.isInstanceOf[GroupingSets])
throwErrorIf(
child.isStreaming && distinctAggExprs.nonEmpty,
"Distinct aggregations are not supported on streaming DataFrames/Datasets. Consider " +
"using approx_count_distinct() instead.")
throwErrorIf(
child.isStreaming && haveGroupingSets,
"Grouping Sets is not supported on streaming DataFrames/Datasets"
)
case _: Command =>
throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
"streaming DataFrames/Datasets")
case _: InsertIntoDir =>
throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets")
// mapGroupsWithState and flatMapGroupsWithState
case m: FlatMapGroupsWithState if m.isStreaming =>
// Check compatibility with output modes and aggregations in query
val aggsInQuery = collectStreamingAggregates(plan)
if (m.initialState.isStreaming) {
// initial state has to be a batch relation
throwError("Non-streaming DataFrame/Dataset is not supported as the" +
" initial state in [flatMap|map]GroupsWithState operation on a streaming" +
" DataFrame/Dataset")
}
if (m.isMapGroupsWithState) { // check mapGroupsWithState
// allowed only in update query output mode
if (outputMode != InternalOutputModes.Update) {
throwError(
"mapGroupsWithState is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
}
} else { // check flatMapGroupsWithState
if (aggsInQuery.isEmpty) {
// flatMapGroupsWithState without aggregation: operation's output mode must
// match query output mode
m.outputMode match {
case InternalOutputModes.Update if outputMode != InternalOutputModes.Update =>
throwError(
"flatMapGroupsWithState in update mode is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
case InternalOutputModes.Append if outputMode != InternalOutputModes.Append =>
throwError(
"flatMapGroupsWithState in append mode is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
case _ =>
}
} else {
// flatMapGroupsWithState with aggregation: update operation mode not allowed
if (m.outputMode == InternalOutputModes.Update) {
throwError(
"flatMapGroupsWithState in update mode is not supported with " +
"aggregation on a streaming DataFrame/Dataset")
}
}
}
// Check compatibility with timeout configs
if (m.timeout == EventTimeTimeout) {
// With event time timeout, watermark must be defined.
val watermarkAttributes = m.child.output.collect {
case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
}
if (watermarkAttributes.isEmpty) {
throwError(
"Watermark must be specified in the query using " +
"'[Dataset/DataFrame].withWatermark()' for using event-time timeout in a " +
"[map|flatMap]GroupsWithState. Event-time timeout not supported without " +
"watermark.")(plan)
}
}
// applyInPandasWithState
case m: FlatMapGroupsInPandasWithState if m.isStreaming =>
// Check compatibility with output modes and aggregations in query
val aggsInQuery = collectStreamingAggregates(plan)
if (aggsInQuery.isEmpty) {
// applyInPandasWithState without aggregation: operation's output mode must
// match query output mode
m.outputMode match {
case InternalOutputModes.Update if outputMode != InternalOutputModes.Update =>
throwError(
"applyInPandasWithState in update mode is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
case InternalOutputModes.Append if outputMode != InternalOutputModes.Append =>
throwError(
"applyInPandasWithState in append mode is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
case _ =>
}
} else {
// applyInPandasWithState with aggregation: update operation mode not allowed, and
// *groupsWithState after aggregation not allowed
if (m.outputMode == InternalOutputModes.Update) {
throwError(
"applyInPandasWithState in update mode is not supported with " +
"aggregation on a streaming DataFrame/Dataset")
} else if (collectStreamingAggregates(m).nonEmpty) {
throwError(
"applyInPandasWithState in append mode is not supported after " +
"aggregation on a streaming DataFrame/Dataset")
}
}
// Check compatibility with timeout configs
if (m.timeout == EventTimeTimeout) {
// With event time timeout, watermark must be defined.
val watermarkAttributes = m.child.output.collect {
case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
}
if (watermarkAttributes.isEmpty) {
throwError(
"Watermark must be specified in the query using " +
"'[Dataset/DataFrame].withWatermark()' for using event-time timeout in a " +
"applyInPandasWithState. Event-time timeout not supported without " +
"watermark.")(plan)
}
}
case j @ Join(left, right, joinType, condition, _) =>
if (left.isStreaming && right.isStreaming && outputMode != InternalOutputModes.Append) {
throwError("Join between two streaming DataFrames/Datasets is not supported" +
s" in ${outputMode} output mode, only in Append output mode")
}
joinType match {
case _: InnerLike =>
// no further validations needed
case FullOuter =>
if (left.isStreaming && !right.isStreaming) {
throwError("FullOuter joins with streaming DataFrames/Datasets on the left " +
"and a static DataFrame/Dataset on the right is not supported")
} else if (!left.isStreaming && right.isStreaming) {
throwError("FullOuter joins with streaming DataFrames/Datasets on the right " +
"and a static DataFrame/Dataset on the left is not supported")
} else if (left.isStreaming && right.isStreaming) {
checkForStreamStreamJoinWatermark(j)
}
case LeftAnti =>
if (right.isStreaming) {
throwError(s"$LeftAnti joins with a streaming DataFrame/Dataset " +
"on the right are not supported")
}
// We support streaming left outer and left semi joins with static on the right always,
// and with stream on both sides under the appropriate conditions.
case LeftOuter | LeftSemi =>
if (!left.isStreaming && right.isStreaming) {
throwError(s"$joinType join with a streaming DataFrame/Dataset " +
"on the right and a static DataFrame/Dataset on the left is not supported")
} else if (left.isStreaming && right.isStreaming) {
checkForStreamStreamJoinWatermark(j)
}
// We support streaming right outer joins with static on the left always, and with
// stream on both sides under the appropriate conditions.
case RightOuter =>
if (left.isStreaming && !right.isStreaming) {
throwError("RightOuter join with a streaming DataFrame/Dataset on the left and " +
"a static DataFrame/DataSet on the right not supported")
} else if (left.isStreaming && right.isStreaming) {
checkForStreamStreamJoinWatermark(j)
}
case NaturalJoin(_) | UsingJoin(_, _) =>
// They should not appear in an analyzed plan.
case _ =>
throwError(s"Join type $joinType is not supported with streaming DataFrame/Dataset")
}
case d: DeduplicateWithinWatermark if d.isStreaming =>
// Find any attributes that are associated with an eventTime watermark.
val watermarkAttributes = d.child.output.collect {
case a: Attribute if a.metadata.contains(EventTimeWatermark.delayKey) => a
}
// DeduplicateWithinWatermark requires event time column being set in the input DataFrame
if (watermarkAttributes.isEmpty) {
throwError(
"dropDuplicatesWithinWatermark is not supported on streaming DataFrames/DataSets " +
"without watermark")(plan)
}
case c: CoGroup if c.children.exists(_.isStreaming) =>
throwError("CoGrouping with a streaming DataFrame/Dataset is not supported")
case u: Union if u.children.map(_.isStreaming).distinct.size == 2 =>
throwError("Union between streaming and batch DataFrames/Datasets is not supported")
case Except(left, right, _) if right.isStreaming =>
throwError("Except on a streaming DataFrame/Dataset on the right is not supported")
case Intersect(left, right, _) if left.isStreaming || right.isStreaming =>
throwError("Intersect of streaming DataFrames/Datasets is not supported")
case GlobalLimit(_, _) | LocalLimit(_, _)
if subPlan.children.forall(_.isStreaming) && outputMode == InternalOutputModes.Update =>
throwError("Limits are not supported on streaming DataFrames/Datasets in Update " +
"output mode")
case Offset(_, _) => throwError("Offset is not supported on streaming DataFrames/Datasets")
case Sort(_, _, _, _) if !containsCompleteData(subPlan) =>
throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on " +
"aggregated DataFrame/Dataset in Complete output mode")
case Sample(_, _, _, _, child) if child.isStreaming =>
throwError("Sampling is not supported on streaming DataFrames/Datasets")
case Window(windowExpression, _, _, child, _) if child.isStreaming =>
val (windowFuncList, columnNameList, windowSpecList) = windowExpression.flatMap { e =>
e.collect {
case we: WindowExpression =>
(we.windowFunction.toString, e.toAttribute.sql, we.windowSpec.sql)
}
}.unzip3
throw QueryExecutionErrors.nonTimeWindowNotSupportedInStreamingError(
windowFuncList,
columnNameList,
windowSpecList,
subPlan.origin)
case ReturnAnswer(child) if child.isStreaming =>
throwError("Cannot return immediate result on streaming DataFrames/Dataset. Queries " +
"with streaming DataFrames/Datasets must be executed with writeStream.start().")
case _ =>
}
// Check if there are unsupported expressions in streaming query plan.
checkUnsupportedExpressions(subPlan)
}
checkStreamingQueryGlobalWatermarkLimit(plan, outputMode)
}