in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala [317:448]
private def createSerializer(enc: AgnosticEncoder[_], input: Expression): Expression = enc match {
case ae: AgnosticExpressionPathEncoder[_] => ae.toCatalyst(input)
case _ if isNativeEncoder(enc) => input
case BoxedBooleanEncoder => createSerializerForBoolean(input)
case BoxedByteEncoder => createSerializerForByte(input)
case BoxedShortEncoder => createSerializerForShort(input)
case BoxedIntEncoder => createSerializerForInteger(input)
case BoxedLongEncoder => createSerializerForLong(input)
case BoxedFloatEncoder => createSerializerForFloat(input)
case BoxedDoubleEncoder => createSerializerForDouble(input)
case JavaEnumEncoder(_) => createSerializerForJavaEnum(input)
case ScalaEnumEncoder(_, _) => createSerializerForScalaEnum(input)
case CharEncoder(length) => createSerializerForChar(input, length)
case VarcharEncoder(length) => createSerializerForVarchar(input, length)
case StringEncoder => createSerializerForString(input)
case ScalaDecimalEncoder(dt) => createSerializerForBigDecimal(input, dt)
case JavaDecimalEncoder(dt, false) => createSerializerForBigDecimal(input, dt)
case JavaDecimalEncoder(dt, true) => createSerializerForAnyDecimal(input, dt)
case ScalaBigIntEncoder => createSerializerForBigInteger(input)
case JavaBigIntEncoder => createSerializerForBigInteger(input)
case DayTimeIntervalEncoder => createSerializerForJavaDuration(input)
case YearMonthIntervalEncoder => createSerializerForJavaPeriod(input)
case DateEncoder(true) | LocalDateEncoder(true) => createSerializerForAnyDate(input)
case DateEncoder(false) => createSerializerForSqlDate(input)
case LocalDateEncoder(false) => createSerializerForJavaLocalDate(input)
case TimestampEncoder(true) | InstantEncoder(true) => createSerializerForAnyTimestamp(input)
case TimestampEncoder(false) => createSerializerForSqlTimestamp(input)
case InstantEncoder(false) => createSerializerForJavaInstant(input)
case LocalDateTimeEncoder => createSerializerForLocalDateTime(input)
case LocalTimeEncoder => createSerializerForLocalTime(input)
case UDTEncoder(udt, udtClass) => createSerializerForUserDefinedType(input, udt, udtClass)
case OptionEncoder(valueEnc) =>
createSerializer(valueEnc, UnwrapOption(externalDataTypeFor(valueEnc), input))
case ArrayEncoder(elementEncoder, containsNull) =>
if (elementEncoder.isPrimitive) {
createSerializerForPrimitiveArray(input, elementEncoder.dataType)
} else {
serializerForArray(elementEncoder, containsNull, input, lenientSerialization = false)
}
case IterableEncoder(ctag, elementEncoder, containsNull, lenientSerialization) =>
val getter = if (classOf[scala.collection.Set[_]].isAssignableFrom(ctag.runtimeClass)) {
// There's no corresponding Catalyst type for `Set`, we serialize a `Set` to Catalyst array.
// Note that the property of `Set` is only kept when manipulating the data as domain object.
Invoke(input, "toSeq", ObjectType(classOf[scala.collection.Seq[_]]))
} else {
input
}
serializerForArray(elementEncoder, containsNull, getter, lenientSerialization)
case MapEncoder(_, keyEncoder, valueEncoder, valueContainsNull) =>
createSerializerForMap(
input,
MapElementInformation(
ObjectType(classOf[AnyRef]),
nullable = keyEncoder.nullable,
validateAndSerializeElement(keyEncoder, keyEncoder.nullable)),
MapElementInformation(
ObjectType(classOf[AnyRef]),
nullable = valueContainsNull,
validateAndSerializeElement(valueEncoder, valueContainsNull))
)
case ProductEncoder(_, fields, _) =>
val serializedFields = fields.map { field =>
// SPARK-26730 inputObject won't be null with If's guard below. And KnownNotNul
// is necessary here. Because for a nullable nested inputObject with struct data
// type, e.g. StructType(IntegerType, StringType), it will return nullable=true
// for IntegerType without KnownNotNull. And that's what we do not expect to.
val getter = Invoke(
KnownNotNull(input),
field.name,
externalDataTypeFor(field.enc),
returnNullable = field.nullable)
field.name -> createSerializer(field.enc, getter)
}
createSerializerForObject(input, serializedFields)
case AgnosticEncoders.RowEncoder(fields) =>
val serializedFields = fields.zipWithIndex.map { case (field, index) =>
val fieldValue = createSerializer(
field.enc,
ValidateExternalType(
GetExternalRowField(input, index, field.name),
field.enc.dataType,
lenientExternalDataTypeFor(field.enc)))
val convertedField = if (field.nullable) {
exprs.If(
Invoke(input, "isNullAt", BooleanType, exprs.Literal(index) :: Nil),
// Because we strip UDTs, `field.dataType` can be different from `fieldValue.dataType`.
// We should use `fieldValue.dataType` here.
exprs.Literal.create(null, fieldValue.dataType),
fieldValue
)
} else {
AssertNotNull(fieldValue)
}
field.name -> convertedField
}
createSerializerForObject(input, serializedFields)
case JavaBeanEncoder(_, fields) =>
val serializedFields = fields.map { f =>
val fieldValue = Invoke(
KnownNotNull(input),
f.readMethod.get,
externalDataTypeFor(f.enc),
propagateNull = f.nullable,
returnNullable = f.nullable)
f.name -> createSerializer(f.enc, fieldValue)
}
createSerializerForObject(input, serializedFields)
case TransformingEncoder(_, _, codec, _) if codec == JavaSerializationCodec =>
EncodeUsingSerializer(input, kryo = false)
case TransformingEncoder(_, _, codec, _) if codec == KryoSerializationCodec =>
EncodeUsingSerializer(input, kryo = true)
case TransformingEncoder(_, encoder, codecProvider, _) =>
val encoded = Invoke(
Literal(codecProvider(), ObjectType(classOf[Codec[_, _]])),
"encode",
externalDataTypeFor(encoder),
input :: Nil,
propagateNull = input.nullable,
returnNullable = input.nullable
)
createSerializer(encoder, encoded)
}