in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala [251:929]
def checkAnalysis0(plan: LogicalPlan): Unit = {
// The target table is not a child plan of the insert command. We should report errors for table
// not found first, instead of errors in the input query of the insert command, by doing a
// top-down traversal.
plan.foreach {
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _) =>
u.tableNotFound(u.multipartIdentifier)
// TODO (SPARK-27484): handle streaming write commands when we have them.
case write: V2WriteCommand if write.table.isInstanceOf[UnresolvedRelation] =>
val tblName = write.table.asInstanceOf[UnresolvedRelation].multipartIdentifier
write.table.tableNotFound(tblName)
// We should check for trailing comma errors first, since we would get less obvious
// unresolved column errors if we do it bottom up
case proj: Project =>
checkTrailingCommaInSelect(proj)
case agg: Aggregate =>
checkTrailingCommaInSelect(agg)
case unionLoop: UnionLoop =>
// Recursive CTEs have already substituted Union to UnionLoop at this stage.
// Here we perform additional checks for them.
checkIfSelfReferenceIsPlacedCorrectly(unionLoop, unionLoop.id)
case _ =>
}
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {
case p if p.analyzed => // Skip already analyzed sub-plans
case leaf: LeafNode if !SQLConf.get.preserveCharVarcharTypeInfo &&
leaf.output.map(_.dataType).exists(CharVarcharUtils.hasCharVarchar) =>
throw SparkException.internalError(
s"Logical plan should not have output of char/varchar type when " +
s"${SQLConf.PRESERVE_CHAR_VARCHAR_TYPE_INFO.key} is false: " + leaf)
case u: UnresolvedNamespace =>
u.schemaNotFound(u.multipartIdentifier)
case u: UnresolvedTable =>
u.tableNotFound(u.multipartIdentifier)
case u: UnresolvedView =>
u.tableNotFound(u.multipartIdentifier)
case u: UnresolvedTableOrView =>
u.tableNotFound(u.multipartIdentifier)
case u: UnresolvedRelation =>
u.tableNotFound(u.multipartIdentifier)
case u: UnresolvedFunctionName =>
val catalogPath = (currentCatalog.name +: catalogManager.currentNamespace).mkString(".")
throw QueryCompilationErrors.unresolvedRoutineError(
u.multipartIdentifier,
Seq("system.builtin", "system.session", catalogPath),
u.origin)
case u: UnresolvedHint =>
throw SparkException.internalError(
msg = s"Hint not found: ${toSQLId(u.name)}",
context = u.origin.getQueryContext,
summary = u.origin.context.summary)
case u: UnresolvedInlineTable if unresolvedInlineTableContainsScalarSubquery(u) =>
throw QueryCompilationErrors.inlineTableContainsScalarSubquery(u)
case command: V2PartitionCommand =>
command.table match {
case r @ ResolvedTable(_, _, table, _) => table match {
case t: SupportsPartitionManagement =>
if (t.partitionSchema.isEmpty) {
r.failAnalysis(
errorClass = "INVALID_PARTITION_OPERATION.PARTITION_SCHEMA_IS_EMPTY",
messageParameters = Map("name" -> toSQLId(r.name)))
}
case _ =>
r.failAnalysis(
errorClass = "INVALID_PARTITION_OPERATION.PARTITION_MANAGEMENT_IS_UNSUPPORTED",
messageParameters = Map("name" -> toSQLId(r.name)))
}
case _ =>
}
case o: OverwriteByExpression if o.deleteExpr.exists(_.isInstanceOf[SubqueryExpression]) =>
o.deleteExpr.failAnalysis (
errorClass = "UNSUPPORTED_FEATURE.OVERWRITE_BY_SUBQUERY",
messageParameters = Map.empty)
case operator: LogicalPlan =>
operator transformExpressionsDown {
case hof: HigherOrderFunction if hof.arguments.exists {
case LambdaFunction(_, _, _) => true
case _ => false
} =>
throw new AnalysisException(
errorClass =
"INVALID_LAMBDA_FUNCTION_CALL.PARAMETER_DOES_NOT_ACCEPT_LAMBDA_FUNCTION",
messageParameters = Map.empty,
origin = hof.origin
)
// Check argument data types of higher-order functions downwards first.
// If the arguments of the higher-order functions are resolved but the type check fails,
// the argument functions will not get resolved, but we should report the argument type
// check failure instead of claiming the argument functions are unresolved.
case hof: HigherOrderFunction
if hof.argumentsResolved && hof.checkArgumentDataTypes().isFailure =>
hof.checkArgumentDataTypes() match {
case checkRes: TypeCheckResult.DataTypeMismatch =>
hof.dataTypeMismatch(hof, checkRes)
case checkRes: TypeCheckResult.InvalidFormat =>
hof.invalidFormat(checkRes)
}
case hof: HigherOrderFunction
if hof.resolved && hof.functions
.exists(_.exists(_.isInstanceOf[PythonUDF])) =>
val u = hof.functions.flatMap(_.find(_.isInstanceOf[PythonUDF])).head
hof.failAnalysis(
errorClass = "UNSUPPORTED_FEATURE.LAMBDA_FUNCTION_WITH_PYTHON_UDF",
messageParameters = Map("funcName" -> toSQLExpr(u)))
// If an attribute can't be resolved as a map key of string type, either the key should be
// surrounded with single quotes, or there is a typo in the attribute name.
case GetMapValue(map, key: Attribute) if isMapWithStringKey(map) && !key.resolved =>
failUnresolvedAttribute(operator, key, "UNRESOLVED_MAP_KEY")
case e: Expression if containsUnsupportedLCA(e, operator) =>
val lcaRefNames =
e.collect { case lcaRef: LateralColumnAliasReference => lcaRef.name }.distinct
failAnalysis(
errorClass = "UNSUPPORTED_FEATURE.LATERAL_COLUMN_ALIAS_IN_GENERATOR",
messageParameters =
Map("lca" -> toSQLId(lcaRefNames), "generatorExpr" -> toSQLExpr(e)))
}
// Fail if we still have an unresolved all in group by. This needs to run before the
// general unresolved check below to throw a more tailored error message.
new ResolveReferencesInAggregate(catalogManager).checkUnresolvedGroupByAll(operator)
// Early checks for column definitions, to produce better error messages
ColumnDefinition.checkColumnDefinitions(operator)
var stagedError: Option[() => Unit] = None
getAllExpressions(operator).foreach(_.foreachUp {
case a: Attribute if !a.resolved =>
failUnresolvedAttribute(operator, a, "UNRESOLVED_COLUMN")
case s: Star =>
withPosition(s) {
throw QueryCompilationErrors.invalidStarUsageError(operator.nodeName, Seq(s))
}
// Should be before `e.checkInputDataTypes()` to produce the correct error for unknown
// window expressions nested inside other expressions
case UnresolvedWindowExpression(_, WindowSpecReference(windowName)) =>
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowName)
case e: Expression if e.checkInputDataTypes().isFailure =>
TypeCoercionValidation.failOnTypeCheckResult(e, Some(operator))
case c: Cast if !c.resolved =>
throw SparkException.internalError(
msg = s"Found the unresolved Cast: ${c.simpleString(SQLConf.get.maxToStringFields)}",
context = c.origin.getQueryContext,
summary = c.origin.context.summary)
case e: RuntimeReplaceable if !e.replacement.resolved =>
throw SparkException.internalError(
s"Cannot resolve the runtime replaceable expression ${toSQLExpr(e)}. " +
s"The replacement is unresolved: ${toSQLExpr(e.replacement)}.")
// `Grouping` and `GroupingID` are considered as of having lower priority than the other
// nodes which cause errors.
case g: Grouping =>
if (stagedError.isEmpty) stagedError = Some(() => g.failAnalysis(
errorClass = "UNSUPPORTED_GROUPING_EXPRESSION", messageParameters = Map.empty))
case g: GroupingID =>
if (stagedError.isEmpty) stagedError = Some(() => g.failAnalysis(
errorClass = "UNSUPPORTED_GROUPING_EXPRESSION", messageParameters = Map.empty))
case e: Expression if e.children.exists(_.isInstanceOf[WindowFunction]) &&
!e.isInstanceOf[WindowExpression] && e.resolved =>
val w = e.children.find(_.isInstanceOf[WindowFunction]).get
e.failAnalysis(
errorClass = "WINDOW_FUNCTION_WITHOUT_OVER_CLAUSE",
messageParameters = Map("funcName" -> toSQLExpr(w)))
case w @ WindowExpression(AggregateExpression(_, _, true, _, _), _) =>
w.failAnalysis(
errorClass = "DISTINCT_WINDOW_FUNCTION_UNSUPPORTED",
messageParameters = Map("windowExpr" -> toSQLExpr(w)))
case w @ WindowExpression(wf: FrameLessOffsetWindowFunction,
WindowSpecDefinition(_, order, frame: SpecifiedWindowFrame))
if order.isEmpty || !frame.isOffset =>
w.failAnalysis(
errorClass = "WINDOW_FUNCTION_AND_FRAME_MISMATCH",
messageParameters = Map(
"funcName" -> toSQLExpr(wf),
"windowExpr" -> toSQLExpr(w)))
case agg @ AggregateExpression(listAgg: ListAgg, _, _, _, _)
if agg.isDistinct && listAgg.needSaveOrderValue =>
throw QueryCompilationErrors.functionAndOrderExpressionMismatchError(
listAgg.prettyName, listAgg.child, listAgg.orderExpressions)
case w: WindowExpression =>
// Only allow window functions with an aggregate expression or an offset window
// function or a Pandas window UDF.
w.windowFunction match {
case agg @ AggregateExpression(fun: ListAgg, _, _, _, _)
// listagg(...) WITHIN GROUP (ORDER BY ...) OVER (ORDER BY ...) is unsupported
if fun.orderingFilled && (w.windowSpec.orderSpec.nonEmpty ||
w.windowSpec.frameSpecification !=
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)) =>
agg.failAnalysis(
errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
messageParameters = Map("aggFunc" -> toSQLExpr(agg.aggregateFunction)))
case agg @ AggregateExpression(
_: PercentileCont | _: PercentileDisc | _: Median, _, _, _, _)
if w.windowSpec.orderSpec.nonEmpty || w.windowSpec.frameSpecification !=
SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing) =>
agg.failAnalysis(
errorClass = "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
messageParameters = Map("aggFunc" -> toSQLExpr(agg.aggregateFunction)))
case _: AggregateExpression | _: FrameLessOffsetWindowFunction |
_: AggregateWindowFunction => // OK
case other =>
other.failAnalysis(
errorClass = "UNSUPPORTED_EXPR_FOR_WINDOW",
messageParameters = Map("sqlExpr" -> toSQLExpr(other)))
}
case s: SubqueryExpression =>
checkSubqueryExpression(operator, s)
case e: ExpressionWithRandomSeed if !e.seedExpression.foldable =>
e.failAnalysis(
errorClass = "SEED_EXPRESSION_IS_UNFOLDABLE",
messageParameters = Map(
"seedExpr" -> toSQLExpr(e.seedExpression),
"exprWithSeed" -> toSQLExpr(e)))
case p: Parameter =>
p.failAnalysis(
errorClass = "UNBOUND_SQL_PARAMETER",
messageParameters = Map("name" -> p.name))
case ma @ MultiAlias(child, names) if child.resolved && !child.isInstanceOf[Generator] =>
ma.failAnalysis(
errorClass = "MULTI_ALIAS_WITHOUT_GENERATOR",
messageParameters = Map("expr" -> toSQLExpr(child), "names" -> names.mkString(", ")))
case _ =>
})
// Check for unresolved TABLE arguments after the main check above to allow other analysis
// errors to apply first, providing better error messages.
getAllExpressions(operator).foreach(_.foreachUp {
case expr: FunctionTableSubqueryArgumentExpression =>
expr.failAnalysis(
errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.UNSUPPORTED_TABLE_ARGUMENT",
messageParameters = Map("treeNode" -> planToString(plan)))
case _ =>
})
if (stagedError.isDefined) stagedError.get.apply()
operator match {
case RelationTimeTravel(u: UnresolvedRelation, _, _) =>
u.tableNotFound(u.multipartIdentifier)
case etw: EventTimeWatermark =>
etw.eventTime.dataType match {
case s: StructType
if s.find(_.name == "end").map(_.dataType) == Some(TimestampType) =>
case _: TimestampType =>
case _ =>
etw.failAnalysis(
errorClass = "EVENT_TIME_IS_NOT_ON_TIMESTAMP_TYPE",
messageParameters = Map(
"eventName" -> toSQLId(etw.eventTime.name),
"eventType" -> toSQLType(etw.eventTime.dataType)))
}
case f: Filter if f.condition.dataType != BooleanType =>
f.failAnalysis(
errorClass = "DATATYPE_MISMATCH.FILTER_NOT_BOOLEAN",
messageParameters = Map(
"sqlExpr" -> f.expressions.map(toSQLExpr).mkString(","),
"filter" -> toSQLExpr(f.condition),
"type" -> toSQLType(f.condition.dataType)))
case j @ Join(_, _, _, Some(condition), _) if condition.dataType != BooleanType =>
j.failAnalysis(
errorClass = "JOIN_CONDITION_IS_NOT_BOOLEAN_TYPE",
messageParameters = Map(
"joinCondition" -> toSQLExpr(condition),
"conditionType" -> toSQLType(condition.dataType)))
case j @ AsOfJoin(_, _, _, Some(condition), _, _, _)
if condition.dataType != BooleanType =>
throw SparkException.internalError(
msg = s"join condition '${toSQLExpr(condition)}' " +
s"of type ${toSQLType(condition.dataType)} is not a boolean.",
context = j.origin.getQueryContext,
summary = j.origin.context.summary)
case j @ AsOfJoin(_, _, _, _, _, _, Some(toleranceAssertion)) =>
if (!toleranceAssertion.foldable) {
j.failAnalysis(
errorClass = "AS_OF_JOIN.TOLERANCE_IS_UNFOLDABLE",
messageParameters = Map.empty)
}
if (!toleranceAssertion.eval().asInstanceOf[Boolean]) {
j.failAnalysis(
errorClass = "AS_OF_JOIN.TOLERANCE_IS_NON_NEGATIVE",
messageParameters = Map.empty)
}
case a: Aggregate => ExprUtils.assertValidAggregation(a)
case CollectMetrics(name, metrics, _, _) =>
if (name == null || name.isEmpty) {
operator.failAnalysis(
errorClass = "INVALID_OBSERVED_METRICS.MISSING_NAME",
messageParameters = Map("operator" -> planToString(operator)))
}
// Check if an expression is a valid metric. A metric must meet the following criteria:
// - Is not a window function;
// - Is not nested aggregate function;
// - Is not a distinct aggregate function;
// - Has only non-deterministic functions that are nested inside an aggregate function;
// - Has only attributes that are nested inside an aggregate function.
def checkMetric(s: Expression, e: Expression, seenAggregate: Boolean = false): Unit = {
e match {
case _: WindowExpression =>
e.failAnalysis(
"INVALID_OBSERVED_METRICS.WINDOW_EXPRESSIONS_UNSUPPORTED",
Map("expr" -> toSQLExpr(s)))
case a: AggregateExpression if seenAggregate =>
e.failAnalysis(
"INVALID_OBSERVED_METRICS.NESTED_AGGREGATES_UNSUPPORTED",
Map("expr" -> toSQLExpr(s)))
case a: AggregateExpression if a.isDistinct =>
e.failAnalysis(
"INVALID_OBSERVED_METRICS.AGGREGATE_EXPRESSION_WITH_DISTINCT_UNSUPPORTED",
Map("expr" -> toSQLExpr(s)))
case a: AggregateExpression if a.filter.isDefined =>
e.failAnalysis(
"INVALID_OBSERVED_METRICS.AGGREGATE_EXPRESSION_WITH_FILTER_UNSUPPORTED",
Map("expr" -> toSQLExpr(s)))
case _: AggregateExpression | _: AggregateFunction =>
e.children.foreach(checkMetric (s, _, seenAggregate = true))
case _: Attribute if !seenAggregate =>
e.failAnalysis(
"INVALID_OBSERVED_METRICS.NON_AGGREGATE_FUNC_ARG_IS_ATTRIBUTE",
Map("expr" -> toSQLExpr(s)))
case a: Alias =>
checkMetric(s, a.child, seenAggregate)
case a if !e.deterministic && !seenAggregate =>
e.failAnalysis(
"INVALID_OBSERVED_METRICS.NON_AGGREGATE_FUNC_ARG_IS_NON_DETERMINISTIC",
Map("expr" -> toSQLExpr(s)))
case _ =>
e.children.foreach(checkMetric (s, _, seenAggregate))
}
}
metrics.foreach(m => checkMetric(m, m))
// see Analyzer.ResolveUnpivot
// given ids must be AttributeReference when no values given
case up @Unpivot(Some(ids), None, _, _, _, _)
if up.childrenResolved && ids.forall(_.resolved) &&
ids.exists(! _.isInstanceOf[AttributeReference]) =>
throw QueryCompilationErrors.unpivotRequiresAttributes("id", "value", up.ids.get)
// given values must be AttributeReference when no ids given
case up @Unpivot(None, Some(values), _, _, _, _)
if up.childrenResolved && values.forall(_.forall(_.resolved)) &&
values.exists(_.exists(! _.isInstanceOf[AttributeReference])) =>
throw QueryCompilationErrors.unpivotRequiresAttributes("value", "id", values.flatten)
// given values must not be empty seq
case up @Unpivot(Some(ids), Some(Seq()), _, _, _, _)
if up.childrenResolved && ids.forall(_.resolved) =>
throw QueryCompilationErrors.unpivotRequiresValueColumns()
// all values must have same length as there are value column names
case up @Unpivot(Some(ids), Some(values), _, _, _, _)
if up.childrenResolved && ids.forall(_.resolved) &&
values.exists(_.length != up.valueColumnNames.length) =>
throw QueryCompilationErrors.unpivotValueSizeMismatchError(up.valueColumnNames.length)
// see TypeCoercionBase.UnpivotCoercion
case up: Unpivot if up.canBeCoercioned && !up.valuesTypeCoercioned =>
throw QueryCompilationErrors.unpivotValueDataTypeMismatchError(up.values.get)
case Sort(orders, _, _, _) =>
orders.foreach { order =>
TypeUtils.tryThrowNotOrderableExpression(order)
}
case Window(_, partitionSpec, _, _, _) =>
// Both `partitionSpec` and `orderSpec` must be orderable. We only need an extra check
// for `partitionSpec` here because `orderSpec` has the type check itself.
partitionSpec.foreach { p =>
TypeUtils.tryThrowNotOrderableExpression(p)
}
case GlobalLimit(limitExpr, _) => checkLimitLikeClause("limit", limitExpr)
case LocalLimit(limitExpr, child) =>
checkLimitLikeClause("limit", limitExpr)
child match {
case Offset(offsetExpr, _) =>
val limit = limitExpr.eval().asInstanceOf[Int]
val offset = offsetExpr.eval().asInstanceOf[Int]
if (Int.MaxValue - limit < offset) {
child.failAnalysis(
errorClass = "SUM_OF_LIMIT_AND_OFFSET_EXCEEDS_MAX_INT",
messageParameters = Map(
"limit" -> limit.toString,
"offset" -> offset.toString))
}
case _ =>
}
case Offset(offsetExpr, _) => checkLimitLikeClause("offset", offsetExpr)
case Tail(limitExpr, _) => checkLimitLikeClause("tail", limitExpr)
case e @ (_: Union | _: SetOperation) if operator.children.length > 1 =>
def dataTypes(plan: LogicalPlan): Seq[DataType] = plan.output.map(_.dataType)
val ref = dataTypes(operator.children.head)
operator.children.tail.zipWithIndex.foreach { case (child, ti) =>
// Check the number of columns
if (child.output.length != ref.length) {
throw QueryCompilationErrors.numColumnsMismatch(
operator = operator.nodeName,
firstNumColumns = ref.length,
invalidOrdinalNum = ti + 1,
invalidNumColumns = child.output.length,
origin = operator.origin
)
}
val dataTypesAreCompatibleFn =
TypeCoercionValidation.getDataTypesAreCompatibleFn(operator)
// Check if the data types match.
dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, dt2), ci) =>
// SPARK-18058: we shall not care about the nullability of columns
if (!dataTypesAreCompatibleFn(dt1, dt2)) {
throw QueryCompilationErrors.incompatibleColumnTypeError(
operator = operator.nodeName,
columnOrdinalNumber = ci,
tableOrdinalNumber = ti + 1,
dataType1 = dt1,
dataType2 = dt2,
hint = TypeCoercionValidation.getHintForOperatorCoercion(operator),
origin = operator.origin
)
}
}
}
case create: V2CreateTablePlan =>
val references = create.partitioning.flatMap(_.references).toSet
val badReferences = references.map(_.fieldNames).flatMap { column =>
create.tableSchema.findNestedField(column.toImmutableArraySeq) match {
case Some(_) =>
None
case _ =>
Some(column.quoted)
}
}
if (badReferences.nonEmpty) {
create.failAnalysis(
errorClass = "UNSUPPORTED_FEATURE.PARTITION_WITH_NESTED_COLUMN_IS_UNSUPPORTED",
messageParameters = Map(
"cols" -> badReferences.map(r => toSQLId(r)).mkString(", ")))
}
create.tableSchema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
SchemaUtils.checkIndeterminateCollationInSchema(create.tableSchema)
case write: V2WriteCommand if write.resolved =>
write.query.schema.foreach(f => TypeUtils.failWithIntervalType(f.dataType))
case alter: AlterTableCommand =>
checkAlterTableCommand(alter)
case c: CreateVariable
if c.resolved && c.defaultExpr.child.containsPattern(PLAN_EXPRESSION) =>
val ident = c.name.asInstanceOf[ResolvedIdentifier]
val varName = toSQLId(
(ident.catalog.name +: ident.identifier.namespace :+ ident.identifier.name)
.toImmutableArraySeq)
throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
"DECLARE VARIABLE",
varName,
c.defaultExpr.originalSQL)
case c: Call if c.resolved && c.bound && c.checkArgTypes().isFailure =>
c.checkArgTypes() match {
case mismatch: TypeCheckResult.DataTypeMismatch =>
c.dataTypeMismatch("CALL", mismatch)
case _ =>
throw SparkException.internalError("Invalid input for procedure")
}
case _ => // Falls back to the following checks
}
operator match {
case o if o.children.nonEmpty && o.missingInput.nonEmpty =>
val missingAttributes = o.missingInput.map(attr => toSQLExpr(attr)).mkString(", ")
val input = o.inputSet.map(attr => toSQLExpr(attr)).mkString(", ")
val resolver = plan.conf.resolver
val attrsWithSameName = o.missingInput.filter { missing =>
o.inputSet.exists(input => resolver(missing.name, input.name))
}
if (attrsWithSameName.nonEmpty) {
val sameNames = attrsWithSameName.map(attr => toSQLExpr(attr)).mkString(", ")
o.failAnalysis(
errorClass = "MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION",
messageParameters = Map(
"missingAttributes" -> missingAttributes,
"input" -> input,
"operator" -> operator.simpleString(SQLConf.get.maxToStringFields),
"operation" -> sameNames
))
} else {
o.failAnalysis(
errorClass = "MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_MISSING_FROM_INPUT",
messageParameters = Map(
"missingAttributes" -> missingAttributes,
"input" -> input,
"operator" -> operator.simpleString(SQLConf.get.maxToStringFields)
))
}
case p @ Project(projectList, _) =>
checkForUnspecifiedWindow(projectList)
case agg@Aggregate(_, aggregateExpressions, _, _) if
PlanHelper.specialExpressionsInUnsupportedOperator(agg).isEmpty =>
checkForUnspecifiedWindow(aggregateExpressions)
case j: Join if !j.duplicateResolved =>
val conflictingAttributes =
j.left.outputSet.intersect(j.right.outputSet).map(toSQLExpr(_)).mkString(", ")
throw SparkException.internalError(
msg = s"""
|Failure when resolving conflicting references in ${j.nodeName}:
|${planToString(plan)}
|Conflicting attributes: $conflictingAttributes.""".stripMargin,
context = j.origin.getQueryContext,
summary = j.origin.context.summary)
case i: Intersect if !i.duplicateResolved =>
val conflictingAttributes =
i.left.outputSet.intersect(i.right.outputSet).map(toSQLExpr(_)).mkString(", ")
throw SparkException.internalError(
msg = s"""
|Failure when resolving conflicting references in ${i.nodeName}:
|${planToString(plan)}
|Conflicting attributes: $conflictingAttributes.""".stripMargin,
context = i.origin.getQueryContext,
summary = i.origin.context.summary)
case e: Except if !e.duplicateResolved =>
val conflictingAttributes =
e.left.outputSet.intersect(e.right.outputSet).map(toSQLExpr(_)).mkString(", ")
throw SparkException.internalError(
msg = s"""
|Failure when resolving conflicting references in ${e.nodeName}:
|${planToString(plan)}
|Conflicting attributes: $conflictingAttributes.""".stripMargin,
context = e.origin.getQueryContext,
summary = e.origin.context.summary)
case j: AsOfJoin if !j.duplicateResolved =>
val conflictingAttributes =
j.left.outputSet.intersect(j.right.outputSet).map(toSQLExpr(_)).mkString(", ")
throw SparkException.internalError(
msg = s"""
|Failure when resolving conflicting references in ${j.nodeName}:
|${planToString(plan)}
|Conflicting attributes: $conflictingAttributes.""".stripMargin,
context = j.origin.getQueryContext,
summary = j.origin.context.summary)
// TODO: although map type is not orderable, technically map type should be able to be
// used in equality comparison, remove this type check once we support it.
case o if mapColumnInSetOperation(o).isDefined =>
val mapCol = mapColumnInSetOperation(o).get
throw QueryCompilationErrors.unsupportedSetOperationOnMapType(
mapCol = mapCol,
origin = operator.origin
)
// TODO: Remove this type check once we support Variant ordering
case o if variantColumnInSetOperation(o).isDefined =>
val variantCol = variantColumnInSetOperation(o).get
throw QueryCompilationErrors.unsupportedSetOperationOnVariantType(
variantCol = variantCol,
origin = operator.origin
)
case o if variantExprInPartitionExpression(o).isDefined =>
val variantExpr = variantExprInPartitionExpression(o).get
o.failAnalysis(
errorClass = "UNSUPPORTED_FEATURE.PARTITION_BY_VARIANT",
messageParameters = Map(
"expr" -> toSQLExpr(variantExpr),
"dataType" -> toSQLType(variantExpr.dataType)))
case o if o.expressions.exists(!_.deterministic) &&
!operatorAllowsNonDeterministicExpressions(o) &&
!o.isInstanceOf[Project] &&
// non-deterministic expressions inside CollectMetrics have been
// already validated inside checkMetric function
!o.isInstanceOf[CollectMetrics] &&
!o.isInstanceOf[Filter] &&
!o.isInstanceOf[Aggregate] &&
!o.isInstanceOf[Window] &&
!o.isInstanceOf[Expand] &&
!o.isInstanceOf[Generate] &&
!o.isInstanceOf[CreateVariable] &&
!o.isInstanceOf[MapInPandas] &&
!o.isInstanceOf[MapInArrow] &&
// Lateral join is checked in checkSubqueryExpression.
!o.isInstanceOf[LateralJoin] =>
// The rule above is used to check Aggregate operator.
o.failAnalysis(
errorClass = "INVALID_NON_DETERMINISTIC_EXPRESSIONS",
messageParameters = Map("sqlExprs" -> o.expressions.map(toSQLExpr(_)).mkString(", "))
)
case _: UnresolvedHint => throw SparkException.internalError(
"Logical hint operator should be removed during analysis.")
case f @ Filter(condition, _)
if PlanHelper.specialExpressionsInUnsupportedOperator(f).nonEmpty =>
val invalidExprSqls = PlanHelper.specialExpressionsInUnsupportedOperator(f).map(_.sql)
f.failAnalysis(
errorClass = "INVALID_WHERE_CONDITION",
messageParameters = Map(
"condition" -> toSQLExpr(condition),
"expressionList" -> invalidExprSqls.mkString(", ")))
case other if PlanHelper.specialExpressionsInUnsupportedOperator(other).nonEmpty =>
val invalidExprSqls =
PlanHelper.specialExpressionsInUnsupportedOperator(other).map(toSQLExpr)
other.failAnalysis(
errorClass = "UNSUPPORTED_EXPR_FOR_OPERATOR",
messageParameters = Map(
"invalidExprSqls" -> invalidExprSqls.mkString(", ")))
case _ => // Analysis successful!
}
}
checkCollectedMetrics(plan)
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case o if !o.resolved =>
throw SparkException.internalError(
msg = s"Found the unresolved operator: ${o.simpleString(SQLConf.get.maxToStringFields)}",
context = o.origin.getQueryContext,
summary = o.origin.context.summary)
// If the plan is resolved, all lateral column alias references should have been either
// restored or resolved. Add check for extra safe.
case o if o.expressions.exists(_.containsPattern(LATERAL_COLUMN_ALIAS_REFERENCE)) =>
checkNotContainingLCA(o.expressions, o)
case _ =>
}
}