in sql/core/src/main/scala/org/apache/spark/sql/classic/columnNodeSupport.scala [47:211]
override def apply(node: ColumnNode): Expression = SQLConf.withExistingConf(conf) {
CurrentOrigin.withOrigin(node.origin) {
node match {
case Literal(value, Some(dataType), _) =>
val converter = CatalystTypeConverters.createToCatalystConverter(dataType)
expressions.Literal(converter(value), dataType)
case Literal(value, None, _) =>
expressions.Literal(value)
case UnresolvedAttribute(nameParts, planId, isMetadataColumn, _) =>
convertUnresolvedAttribute(nameParts, planId, isMetadataColumn)
case UnresolvedStar(unparsedTarget, None, _) =>
val target = unparsedTarget.map { t =>
analysis.UnresolvedAttribute.parseAttributeName(t.stripSuffix(".*"))
}
analysis.UnresolvedStar(target)
case UnresolvedStar(None, Some(planId), _) =>
analysis.UnresolvedDataFrameStar(planId)
case UnresolvedRegex(ParserUtils.escapedIdentifier(columnNameRegex), _, _) =>
analysis.UnresolvedRegex(columnNameRegex, None, conf.caseSensitiveAnalysis)
case UnresolvedRegex(
ParserUtils.qualifiedEscapedIdentifier(nameParts, columnNameRegex), _, _) =>
analysis.UnresolvedRegex(columnNameRegex, Some(nameParts), conf.caseSensitiveAnalysis)
case UnresolvedRegex(unparsedIdentifier, planId, _) =>
convertUnresolvedRegex(unparsedIdentifier, planId)
case UnresolvedFunction(functionName, arguments, isDistinct, isUDF, isInternal, _) =>
val nameParts = if (isUDF) {
parser.parseMultipartIdentifier(functionName)
} else {
Seq(functionName)
}
analysis.UnresolvedFunction(
nameParts = nameParts,
arguments = arguments.map(apply),
isDistinct = isDistinct,
isInternal = isInternal)
case Alias(child, Seq(name), None, _) =>
expressions.Alias(apply(child), name)(
nonInheritableMetadataKeys = Seq(Dataset.DATASET_ID_KEY, Dataset.COL_POS_KEY))
case Alias(child, Seq(name), metadata, _) =>
expressions.Alias(apply(child), name)(explicitMetadata = metadata)
case Alias(child, names, None, _) if names.nonEmpty =>
analysis.MultiAlias(apply(child), names)
case Cast(child, dataType, evalMode, _) =>
val convertedEvalMode = evalMode match {
case Some(Cast.Ansi) => expressions.EvalMode.ANSI
case Some(Cast.Legacy) => expressions.EvalMode.LEGACY
case Some(Cast.Try) => expressions.EvalMode.TRY
case _ => expressions.EvalMode.fromSQLConf(conf)
}
val cast = expressions.Cast(
apply(child),
CharVarcharUtils.replaceCharVarcharWithStringForCast(dataType),
None,
convertedEvalMode)
cast.setTagValue(expressions.Cast.USER_SPECIFIED_CAST, ())
cast
case SqlExpression(expression, _) =>
parser.parseExpression(expression)
case sortOrder: SortOrder =>
convertSortOrder(sortOrder)
case Window(function, spec, _) =>
val frame = spec.frame match {
case Some(WindowFrame(frameType, lower, upper)) =>
val convertedFrameType = frameType match {
case WindowFrame.Range => expressions.RangeFrame
case WindowFrame.Row => expressions.RowFrame
}
expressions.SpecifiedWindowFrame(
convertedFrameType,
convertWindowFrameBoundary(lower),
convertWindowFrameBoundary(upper))
case None =>
expressions.UnspecifiedFrame
}
expressions.WindowExpression(
apply(function),
expressions.WindowSpecDefinition(
partitionSpec = spec.partitionColumns.map(apply),
orderSpec = spec.sortColumns.map(convertSortOrder),
frameSpecification = frame))
case LambdaFunction(function, arguments, _) =>
expressions.LambdaFunction(
apply(function),
arguments.map(convertUnresolvedNamedLambdaVariable))
case v: UnresolvedNamedLambdaVariable =>
convertUnresolvedNamedLambdaVariable(v)
case UnresolvedExtractValue(child, extraction, _) =>
analysis.UnresolvedExtractValue(apply(child), apply(extraction))
case UpdateFields(struct, field, Some(value), _) =>
expressions.UpdateFields(apply(struct), field, apply(value))
case UpdateFields(struct, field, None, _) =>
expressions.UpdateFields(apply(struct), field)
case CaseWhenOtherwise(branches, otherwise, _) =>
expressions.CaseWhen(
branches = branches.map { case (condition, value) =>
(apply(condition), apply(value))
},
elseValue = otherwise.map(apply))
case InvokeInlineUserDefinedFunction(
a: Aggregator[Any @unchecked, Any @unchecked, Any @unchecked], Nil, isDistinct, _) =>
TypedAggregateExpression(a)(a.bufferEncoder, a.outputEncoder)
.toAggregateExpression(isDistinct)
case InvokeInlineUserDefinedFunction(
a: UserDefinedAggregator[Any @unchecked, Any @unchecked, Any @unchecked],
arguments, isDistinct, _) =>
ScalaAggregator(a, arguments.map(apply)).toAggregateExpression(isDistinct)
case InvokeInlineUserDefinedFunction(
a: UserDefinedAggregateFunction, arguments, isDistinct, _) =>
ScalaUDAF(udaf = a, children = arguments.map(apply)).toAggregateExpression(isDistinct)
case InvokeInlineUserDefinedFunction(udf: SparkUserDefinedFunction, arguments, _, _) =>
toScalaUDF(udf, arguments.map(apply))
case ExpressionColumnNode(expression, _) =>
val transformed = expression.transformDown {
case ColumnNodeExpression(node) => apply(node)
}
transformed match {
case f: AggregateFunction => f.toAggregateExpression()
case _ => transformed
}
case l: LazyExpression =>
analysis.LazyExpression(apply(l.child))
case SubqueryExpression(ds, subqueryType, _) =>
subqueryType match {
case SubqueryType.SCALAR =>
expressions.ScalarSubquery(ds.logicalPlan)
case SubqueryType.EXISTS =>
expressions.Exists(ds.logicalPlan)
case SubqueryType.IN(values) =>
expressions.InSubquery(
values.map(value => apply(value)), expressions.ListQuery(ds.logicalPlan))
}
case node =>
throw SparkException.internalError("Unsupported ColumnNode: " + node)
}
}
}