def deserialize()

in hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala [99:318]


  def deserialize(data: Any): Option[Any] = converter(data)

  /**
   * Creates a writer to write avro values to Catalyst values at the given ordinal with the given
   * updater.
   */
  private def newWriter(avroType: Schema,
                        catalystType: DataType,
                        path: List[String]): (CatalystDataUpdater, Int, Any) => Unit =
    (avroType.getType, catalystType) match {
      case (NULL, NullType) => (updater, ordinal, _) =>
        updater.setNullAt(ordinal)

      // TODO: we can avoid boxing if future version of avro provide primitive accessors.
      case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
        updater.setBoolean(ordinal, value.asInstanceOf[Boolean])

      case (INT, IntegerType) => (updater, ordinal, value) =>
        updater.setInt(ordinal, value.asInstanceOf[Int])

      case (INT, DateType) => (updater, ordinal, value) =>
        updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))

      case (LONG, LongType) => (updater, ordinal, value) =>
        updater.setLong(ordinal, value.asInstanceOf[Long])

      case (LONG, TimestampType) => avroType.getLogicalType match {
        // For backward compatibility, if the Avro type is Long and it is not logical type
        // (the `null` case), the value is processed as timestamp type with millisecond precision.
        case null | _: TimestampMillis => (updater, ordinal, value) =>
          val millis = value.asInstanceOf[Long]
          val micros = DateTimeUtils.millisToMicros(millis)
          updater.setLong(ordinal, timestampRebaseFunc(micros))
        case _: TimestampMicros => (updater, ordinal, value) =>
          val micros = value.asInstanceOf[Long]
          updater.setLong(ordinal, timestampRebaseFunc(micros))
        case other => throw new IncompatibleSchemaException(
          s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.")
      }

      // Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date.
      // For backward compatibility, we still keep this conversion.
      case (LONG, DateType) => (updater, ordinal, value) =>
        updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt)

      case (FLOAT, FloatType) => (updater, ordinal, value) =>
        updater.setFloat(ordinal, value.asInstanceOf[Float])

      case (DOUBLE, DoubleType) => (updater, ordinal, value) =>
        updater.setDouble(ordinal, value.asInstanceOf[Double])

      case (STRING, StringType) => (updater, ordinal, value) =>
        val str = value match {
          case s: String => UTF8String.fromString(s)
          case s: Utf8 =>
            val bytes = new Array[Byte](s.getByteLength)
            System.arraycopy(s.getBytes, 0, bytes, 0, s.getByteLength)
            UTF8String.fromBytes(bytes)
          case s: GenericData.EnumSymbol => UTF8String.fromString(s.toString)
        }
        updater.set(ordinal, str)

      case (ENUM, StringType) => (updater, ordinal, value) =>
        updater.set(ordinal, UTF8String.fromString(value.toString))

      case (FIXED, BinaryType) => (updater, ordinal, value) =>
        updater.set(ordinal, value.asInstanceOf[GenericFixed].bytes().clone())

      case (BYTES, BinaryType) => (updater, ordinal, value) =>
        val bytes = value match {
          case b: ByteBuffer =>
            val bytes = new Array[Byte](b.remaining)
            b.get(bytes)
            // Do not forget to reset the position
            b.rewind()
            bytes
          case b: Array[Byte] => b
          case other => throw new RuntimeException(s"$other is not a valid avro binary.")
        }
        updater.set(ordinal, bytes)

      case (FIXED, _: DecimalType) => (updater, ordinal, value) =>
        val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
        val bigDecimal = decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d)
        val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
        updater.setDecimal(ordinal, decimal)

      case (BYTES, _: DecimalType) => (updater, ordinal, value) =>
        val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
        val bigDecimal = decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
        val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
        updater.setDecimal(ordinal, decimal)

      case (RECORD, st: StructType) =>
        // Avro datasource doesn't accept filters with nested attributes. See SPARK-32328.
        // We can always return `false` from `applyFilters` for nested records.
        val writeRecord = getRecordWriter(avroType, st, path, applyFilters = _ => false)
        (updater, ordinal, value) =>
          val row = new SpecificInternalRow(st)
          writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord])
          updater.set(ordinal, row)

      case (ARRAY, ArrayType(elementType, containsNull)) =>
        val elementWriter = newWriter(avroType.getElementType, elementType, path)
        (updater, ordinal, value) =>
          val collection = value.asInstanceOf[java.util.Collection[Any]]
          val result = createArrayData(elementType, collection.size())
          val elementUpdater = new ArrayDataUpdater(result)

          var i = 0
          val iter = collection.iterator()
          while (iter.hasNext) {
            val element = iter.next()
            if (element == null) {
              if (!containsNull) {
                throw new RuntimeException(s"Array value at path ${path.mkString(".")} is not " +
                  "allowed to be null")
              } else {
                elementUpdater.setNullAt(i)
              }
            } else {
              elementWriter(elementUpdater, i, element)
            }
            i += 1
          }

          updater.set(ordinal, result)

      case (MAP, MapType(keyType, valueType, valueContainsNull)) if keyType == StringType =>
        val keyWriter = newWriter(SchemaBuilder.builder().stringType(), StringType, path)
        val valueWriter = newWriter(avroType.getValueType, valueType, path)
        (updater, ordinal, value) =>
          val map = value.asInstanceOf[java.util.Map[AnyRef, AnyRef]]
          val keyArray = createArrayData(keyType, map.size())
          val keyUpdater = new ArrayDataUpdater(keyArray)
          val valueArray = createArrayData(valueType, map.size())
          val valueUpdater = new ArrayDataUpdater(valueArray)
          val iter = map.entrySet().iterator()
          var i = 0
          while (iter.hasNext) {
            val entry = iter.next()
            assert(entry.getKey != null)
            keyWriter(keyUpdater, i, entry.getKey)
            if (entry.getValue == null) {
              if (!valueContainsNull) {
                throw new RuntimeException(s"Map value at path ${path.mkString(".")} is not " +
                  "allowed to be null")
              } else {
                valueUpdater.setNullAt(i)
              }
            } else {
              valueWriter(valueUpdater, i, entry.getValue)
            }
            i += 1
          }

          // The Avro map will never have null or duplicated map keys, it's safe to create a
          // ArrayBasedMapData directly here.
          updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray))

      case (UNION, _) =>
        val allTypes = avroType.getTypes.asScala
        val nonNullTypes = allTypes.filter(_.getType != NULL)
        val nonNullAvroType = Schema.createUnion(nonNullTypes.asJava)
        if (nonNullTypes.nonEmpty) {
          if (nonNullTypes.length == 1) {
            newWriter(nonNullTypes.head, catalystType, path)
          } else {
            nonNullTypes.map(_.getType).toSeq match {
              case Seq(a, b) if Set(a, b) == Set(INT, LONG) && catalystType == LongType =>
                (updater, ordinal, value) =>
                  value match {
                    case null => updater.setNullAt(ordinal)
                    case l: java.lang.Long => updater.setLong(ordinal, l)
                    case i: java.lang.Integer => updater.setLong(ordinal, i.longValue())
                  }

              case Seq(a, b) if Set(a, b) == Set(FLOAT, DOUBLE) && catalystType == DoubleType =>
                (updater, ordinal, value) =>
                  value match {
                    case null => updater.setNullAt(ordinal)
                    case d: java.lang.Double => updater.setDouble(ordinal, d)
                    case f: java.lang.Float => updater.setDouble(ordinal, f.doubleValue())
                  }

              case _ =>
                catalystType match {
                  case st: StructType if st.length == nonNullTypes.size =>
                    val fieldWriters = nonNullTypes.zip(st.fields).map {
                      case (schema, field) => newWriter(schema, field.dataType, path :+ field.name)
                    }.toArray
                    (updater, ordinal, value) => {
                      val row = new SpecificInternalRow(st)
                      val fieldUpdater = new RowUpdater(row)
                      val i = GenericData.get().resolveUnion(nonNullAvroType, value)
                      fieldWriters(i)(fieldUpdater, i, value)
                      updater.set(ordinal, row)
                    }

                  case _ =>
                    throw new IncompatibleSchemaException(
                      s"Cannot convert Avro to catalyst because schema at path " +
                        s"${path.mkString(".")} is not compatible " +
                        s"(avroType = $avroType, sqlType = $catalystType).\n" +
                        s"Source Avro schema: $rootAvroType.\n" +
                        s"Target Catalyst type: $rootCatalystType")
                }
            }
          }
        } else {
          (updater, ordinal, value) => updater.setNullAt(ordinal)
        }

      case _ =>
        throw new IncompatibleSchemaException(
          s"Cannot convert Avro to catalyst because schema at path ${path.mkString(".")} " +
            s"is not compatible (avroType = $avroType, sqlType = $catalystType).\n" +
            s"Source Avro schema: $rootAvroType.\n" +
            s"Target Catalyst type: $rootCatalystType")
    }