private def createSerializer()

in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala [317:448]


  private def createSerializer(enc: AgnosticEncoder[_], input: Expression): Expression = enc match {
    case ae: AgnosticExpressionPathEncoder[_] => ae.toCatalyst(input)
    case _ if isNativeEncoder(enc) => input
    case BoxedBooleanEncoder => createSerializerForBoolean(input)
    case BoxedByteEncoder => createSerializerForByte(input)
    case BoxedShortEncoder => createSerializerForShort(input)
    case BoxedIntEncoder => createSerializerForInteger(input)
    case BoxedLongEncoder => createSerializerForLong(input)
    case BoxedFloatEncoder => createSerializerForFloat(input)
    case BoxedDoubleEncoder => createSerializerForDouble(input)
    case JavaEnumEncoder(_) => createSerializerForJavaEnum(input)
    case ScalaEnumEncoder(_, _) => createSerializerForScalaEnum(input)
    case CharEncoder(length) => createSerializerForChar(input, length)
    case VarcharEncoder(length) => createSerializerForVarchar(input, length)
    case StringEncoder => createSerializerForString(input)
    case ScalaDecimalEncoder(dt) => createSerializerForBigDecimal(input, dt)
    case JavaDecimalEncoder(dt, false) => createSerializerForBigDecimal(input, dt)
    case JavaDecimalEncoder(dt, true) => createSerializerForAnyDecimal(input, dt)
    case ScalaBigIntEncoder => createSerializerForBigInteger(input)
    case JavaBigIntEncoder => createSerializerForBigInteger(input)
    case DayTimeIntervalEncoder => createSerializerForJavaDuration(input)
    case YearMonthIntervalEncoder => createSerializerForJavaPeriod(input)
    case DateEncoder(true) | LocalDateEncoder(true) => createSerializerForAnyDate(input)
    case DateEncoder(false) => createSerializerForSqlDate(input)
    case LocalDateEncoder(false) => createSerializerForJavaLocalDate(input)
    case TimestampEncoder(true) | InstantEncoder(true) => createSerializerForAnyTimestamp(input)
    case TimestampEncoder(false) => createSerializerForSqlTimestamp(input)
    case InstantEncoder(false) => createSerializerForJavaInstant(input)
    case LocalDateTimeEncoder => createSerializerForLocalDateTime(input)
    case LocalTimeEncoder => createSerializerForLocalTime(input)
    case UDTEncoder(udt, udtClass) => createSerializerForUserDefinedType(input, udt, udtClass)
    case OptionEncoder(valueEnc) =>
      createSerializer(valueEnc, UnwrapOption(externalDataTypeFor(valueEnc), input))

    case ArrayEncoder(elementEncoder, containsNull) =>
      if (elementEncoder.isPrimitive) {
        createSerializerForPrimitiveArray(input, elementEncoder.dataType)
      } else {
        serializerForArray(elementEncoder, containsNull, input, lenientSerialization = false)
      }

    case IterableEncoder(ctag, elementEncoder, containsNull, lenientSerialization) =>
      val getter = if (classOf[scala.collection.Set[_]].isAssignableFrom(ctag.runtimeClass)) {
        // There's no corresponding Catalyst type for `Set`, we serialize a `Set` to Catalyst array.
        // Note that the property of `Set` is only kept when manipulating the data as domain object.
        Invoke(input, "toSeq", ObjectType(classOf[scala.collection.Seq[_]]))
      } else {
        input
      }
      serializerForArray(elementEncoder, containsNull, getter, lenientSerialization)

    case MapEncoder(_, keyEncoder, valueEncoder, valueContainsNull) =>
      createSerializerForMap(
        input,
        MapElementInformation(
          ObjectType(classOf[AnyRef]),
          nullable = keyEncoder.nullable,
          validateAndSerializeElement(keyEncoder, keyEncoder.nullable)),
        MapElementInformation(
          ObjectType(classOf[AnyRef]),
          nullable = valueContainsNull,
          validateAndSerializeElement(valueEncoder, valueContainsNull))
      )

    case ProductEncoder(_, fields, _) =>
      val serializedFields = fields.map { field =>
        // SPARK-26730 inputObject won't be null with If's guard below. And KnownNotNul
        // is necessary here. Because for a nullable nested inputObject with struct data
        // type, e.g. StructType(IntegerType, StringType), it will return nullable=true
        // for IntegerType without KnownNotNull. And that's what we do not expect to.
        val getter = Invoke(
          KnownNotNull(input),
          field.name,
          externalDataTypeFor(field.enc),
          returnNullable = field.nullable)
        field.name -> createSerializer(field.enc, getter)
      }
      createSerializerForObject(input, serializedFields)

    case AgnosticEncoders.RowEncoder(fields) =>
      val serializedFields = fields.zipWithIndex.map { case (field, index) =>
        val fieldValue = createSerializer(
          field.enc,
          ValidateExternalType(
            GetExternalRowField(input, index, field.name),
            field.enc.dataType,
            lenientExternalDataTypeFor(field.enc)))

        val convertedField = if (field.nullable) {
          exprs.If(
            Invoke(input, "isNullAt", BooleanType, exprs.Literal(index) :: Nil),
            // Because we strip UDTs, `field.dataType` can be different from `fieldValue.dataType`.
            // We should use `fieldValue.dataType` here.
            exprs.Literal.create(null, fieldValue.dataType),
            fieldValue
          )
        } else {
          AssertNotNull(fieldValue)
        }
        field.name -> convertedField
      }
      createSerializerForObject(input, serializedFields)

    case JavaBeanEncoder(_, fields) =>
      val serializedFields = fields.map { f =>
        val fieldValue = Invoke(
          KnownNotNull(input),
          f.readMethod.get,
          externalDataTypeFor(f.enc),
          propagateNull = f.nullable,
          returnNullable = f.nullable)
        f.name -> createSerializer(f.enc, fieldValue)
      }
      createSerializerForObject(input, serializedFields)

    case TransformingEncoder(_, _, codec, _) if codec == JavaSerializationCodec =>
      EncodeUsingSerializer(input, kryo = false)

    case TransformingEncoder(_, _, codec, _) if codec == KryoSerializationCodec =>
      EncodeUsingSerializer(input, kryo = true)

    case TransformingEncoder(_, encoder, codecProvider, _) =>
      val encoded = Invoke(
        Literal(codecProvider(), ObjectType(classOf[Codec[_, _]])),
        "encode",
        externalDataTypeFor(encoder),
        input :: Nil,
        propagateNull = input.nullable,
        returnNullable = input.nullable
      )
      createSerializer(encoder, encoded)
  }