in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala [278:475]
private def createDeserializer(
enc: AgnosticEncoder[_],
path: Expression,
walkedTypePath: WalkedTypePath): Expression = enc match {
case ae: AgnosticExpressionPathEncoder[_] =>
ae.fromCatalyst(path)
case _ if isNativeEncoder(enc) =>
path
case _: BoxedLeafEncoder[_, _] =>
createDeserializerForTypesSupportValueOf(path, enc.clsTag.runtimeClass)
case JavaEnumEncoder(tag) =>
val toString = createDeserializerForString(path, returnNullable = false)
createDeserializerForTypesSupportValueOf(toString, tag.runtimeClass)
case ScalaEnumEncoder(parent, tag) =>
StaticInvoke(
parent,
ObjectType(tag.runtimeClass),
"withName",
createDeserializerForString(path, returnNullable = false) :: Nil,
returnNullable = false)
case CharEncoder(length) =>
createDeserializerForChar(path, returnNullable = false, length)
case VarcharEncoder(length) =>
createDeserializerForVarchar(path, returnNullable = false, length)
case StringEncoder =>
createDeserializerForString(path, returnNullable = false)
case _: ScalaDecimalEncoder =>
createDeserializerForScalaBigDecimal(path, returnNullable = false)
case _: JavaDecimalEncoder =>
createDeserializerForJavaBigDecimal(path, returnNullable = false)
case ScalaBigIntEncoder =>
createDeserializerForScalaBigInt(path)
case JavaBigIntEncoder =>
createDeserializerForJavaBigInteger(path, returnNullable = false)
case DayTimeIntervalEncoder =>
createDeserializerForDuration(path)
case YearMonthIntervalEncoder =>
createDeserializerForPeriod(path)
case _: DateEncoder =>
createDeserializerForSqlDate(path)
case _: LocalDateEncoder =>
createDeserializerForLocalDate(path)
case _: TimestampEncoder =>
createDeserializerForSqlTimestamp(path)
case _: InstantEncoder =>
createDeserializerForInstant(path)
case LocalDateTimeEncoder =>
createDeserializerForLocalDateTime(path)
case LocalTimeEncoder =>
createDeserializerForLocalTime(path)
case UDTEncoder(udt, udtClass) =>
val obj = NewInstance(udtClass, Nil, ObjectType(udtClass))
Invoke(obj, "deserialize", ObjectType(udt.userClass), path :: Nil)
case OptionEncoder(valueEnc) =>
val newTypePath = walkedTypePath.recordOption(valueEnc.clsTag.runtimeClass.getName)
val deserializer = createDeserializer(valueEnc, path, newTypePath)
WrapOption(deserializer, externalDataTypeFor(valueEnc))
case ArrayEncoder(elementEnc: AgnosticEncoder[_], containsNull) =>
Invoke(
deserializeArray(
path,
elementEnc,
containsNull,
None,
walkedTypePath),
toArrayMethodName(elementEnc),
ObjectType(enc.clsTag.runtimeClass),
returnNullable = false)
case IterableEncoder(clsTag, elementEnc, containsNull, _) =>
deserializeArray(
path,
elementEnc,
containsNull,
Option(clsTag.runtimeClass),
walkedTypePath)
case MapEncoder(tag, keyEncoder, valueEncoder, _)
if classOf[java.util.Map[_, _]].isAssignableFrom(tag.runtimeClass) =>
// TODO (hvanhovell) this is can be improved.
val newTypePath = walkedTypePath.recordMap(
keyEncoder.clsTag.runtimeClass.getName,
valueEncoder.clsTag.runtimeClass.getName)
val keyData =
Invoke(
UnresolvedMapObjects(
p => createDeserializer(keyEncoder, p, newTypePath),
MapKeys(path)),
"array",
ObjectType(classOf[Array[Any]]))
val valueData =
Invoke(
UnresolvedMapObjects(
p => createDeserializer(valueEncoder, p, newTypePath),
MapValues(path)),
"array",
ObjectType(classOf[Array[Any]]))
StaticInvoke(
ArrayBasedMapData.getClass,
ObjectType(classOf[java.util.Map[_, _]]),
"toJavaMap",
keyData :: valueData :: Nil,
returnNullable = false)
case MapEncoder(tag, keyEncoder, valueEncoder, _) =>
val newTypePath = walkedTypePath.recordMap(
keyEncoder.clsTag.runtimeClass.getName,
valueEncoder.clsTag.runtimeClass.getName)
UnresolvedCatalystToExternalMap(
path,
createDeserializer(keyEncoder, _, newTypePath),
createDeserializer(valueEncoder, _, newTypePath),
tag.runtimeClass)
case ProductEncoder(tag, fields, outerPointerGetter) =>
val cls = tag.runtimeClass
val dt = ObjectType(cls)
val isTuple = cls.getName.startsWith("scala.Tuple")
val arguments = fields.zipWithIndex.map {
case (field, i) =>
val newTypePath = walkedTypePath.recordField(
field.enc.clsTag.runtimeClass.getName,
field.name)
// For tuples, we grab the inner fields by ordinal instead of name.
val getter = if (isTuple) {
addToPathOrdinal(path, i, field.enc.dataType, newTypePath)
} else {
addToPath(path, field.name, field.enc.dataType, newTypePath)
}
expressionWithNullSafety(
createDeserializer(field.enc, getter, newTypePath),
field.enc.nullable,
newTypePath)
}
exprs.If(
IsNull(path),
exprs.Literal.create(null, dt),
NewInstance(cls, arguments, Nil, propagateNull = false, dt, outerPointerGetter))
case AgnosticEncoders.RowEncoder(fields) =>
val isExternalRow = !path.dataType.isInstanceOf[StructType]
val convertedFields = fields.zipWithIndex.map { case (f, i) =>
val newTypePath = walkedTypePath.recordField(
f.enc.clsTag.runtimeClass.getName,
f.name)
val deserializer = createDeserializer(f.enc, GetStructField(path, i), newTypePath)
if (isExternalRow) {
exprs.If(
Invoke(path, "isNullAt", BooleanType, exprs.Literal(i) :: Nil),
exprs.Literal.create(null, externalDataTypeFor(f.enc)),
deserializer)
} else {
deserializer
}
}
exprs.If(IsNull(path),
exprs.Literal.create(null, externalDataTypeFor(enc)),
CreateExternalRow(convertedFields, enc.schema))
case JavaBeanEncoder(tag, fields) =>
val setters = fields
.filter(_.writeMethod.isDefined)
.map { f =>
val newTypePath = walkedTypePath.recordField(
f.enc.clsTag.runtimeClass.getName,
f.name)
val setter = expressionWithNullSafety(
createDeserializer(
f.enc,
addToPath(path, f.name, f.enc.dataType, newTypePath),
newTypePath),
nullable = f.nullable,
newTypePath)
f.writeMethod.get -> setter
}
val cls = tag.runtimeClass
val newInstance = NewInstance(cls, Nil, ObjectType(cls), propagateNull = false)
val result = InitializeJavaBean(newInstance, setters.toMap)
exprs.If(IsNull(path), exprs.Literal.create(null, ObjectType(cls)), result)
case TransformingEncoder(tag, _, codec, _) if codec == JavaSerializationCodec =>
DecodeUsingSerializer(path, tag, kryo = false)
case TransformingEncoder(tag, _, codec, _) if codec == KryoSerializationCodec =>
DecodeUsingSerializer(path, tag, kryo = true)
case TransformingEncoder(tag, encoder, provider, _) =>
Invoke(
Literal.create(provider(), ObjectType(classOf[Codec[_, _]])),
"decode",
ObjectType(tag.runtimeClass),
createDeserializer(encoder, path, walkedTypePath) :: Nil)
}