in sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala [118:406]
def deserialize(data: Any): Option[Any] = converter(data)
/**
* Creates a writer to write avro values to Catalyst values at the given ordinal with the given
* updater.
*/
private def newWriter(
avroType: Schema,
catalystType: DataType,
avroPath: Seq[String],
catalystPath: Seq[String]): (CatalystDataUpdater, Int, Any) => Unit = {
val errorPrefix = s"Cannot convert Avro ${toFieldStr(avroPath)} to " +
s"SQL ${toFieldStr(catalystPath)} because "
val incompatibleMsg = errorPrefix +
s"schema is incompatible (avroType = $avroType, sqlType = ${catalystType.sql})"
val realDataType = SchemaConverters.toSqlType(
avroType, useStableIdForUnionType, stableIdPrefixForUnionType,
recursiveFieldMaxDepth).dataType
(avroType.getType, catalystType) match {
case (NULL, NullType) => (updater, ordinal, _) =>
updater.setNullAt(ordinal)
// TODO: we can avoid boxing if future version of avro provide primitive accessors.
case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
case (INT, IntegerType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])
case (INT, LongType) => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Int])
case (INT, DoubleType) => (updater, ordinal, value) =>
updater.setDouble(ordinal, value.asInstanceOf[Int])
case (INT, dt: DatetimeType)
if preventReadingIncorrectType && realDataType.isInstanceOf[YearMonthIntervalType] =>
throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
toFieldStr(catalystPath), realDataType.catalogString, dt.catalogString)
case (INT, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
case (INT, TimestampNTZType) if avroType.getLogicalType.isInstanceOf[LogicalTypes.Date] =>
(updater, ordinal, value) =>
val days = dateRebaseFunc(value.asInstanceOf[Int])
val micros = DateTimeUtils.daysToMicros(days, ZoneOffset.UTC)
updater.setLong(ordinal, micros)
case (LONG, dt: DatetimeType)
if preventReadingIncorrectType && realDataType.isInstanceOf[DayTimeIntervalType] =>
throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
toFieldStr(catalystPath), realDataType.catalogString, dt.catalogString)
case (LONG, LongType) => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long])
case (LONG, TimestampType) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), the value is processed as timestamp type with millisecond precision.
case null | _: TimestampMillis => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.millisToMicros(millis)
updater.setLong(ordinal, timestampRebaseFunc(micros))
case _: TimestampMicros => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
updater.setLong(ordinal, timestampRebaseFunc(micros))
case other => throw new IncompatibleSchemaException(errorPrefix +
s"Avro logical type $other cannot be converted to SQL type ${TimestampType.sql}.")
}
case (LONG, TimestampNTZType) => avroType.getLogicalType match {
// To keep consistent with TimestampType, if the Avro type is Long and it is not
// logical type (the `null` case), the value is processed as TimestampNTZ
// with millisecond precision.
case null | _: LocalTimestampMillis => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.millisToMicros(millis)
updater.setLong(ordinal, micros)
case _: LocalTimestampMicros => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
updater.setLong(ordinal, micros)
case other => throw new IncompatibleSchemaException(errorPrefix +
s"Avro logical type $other cannot be converted to SQL type ${TimestampNTZType.sql}.")
}
// Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date.
// For backward compatibility, we still keep this conversion.
case (LONG, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, (value.asInstanceOf[Long] / MILLIS_PER_DAY).toInt)
case (FLOAT, FloatType) => (updater, ordinal, value) =>
updater.setFloat(ordinal, value.asInstanceOf[Float])
case (FLOAT, DoubleType) => (updater, ordinal, value) =>
updater.setDouble(ordinal, value.asInstanceOf[Float])
case (DOUBLE, DoubleType) => (updater, ordinal, value) =>
updater.setDouble(ordinal, value.asInstanceOf[Double])
case (STRING, StringType) => (updater, ordinal, value) =>
val str = value match {
case s: String => UTF8String.fromString(s)
case s: Utf8 =>
val bytes = new Array[Byte](s.getByteLength)
System.arraycopy(s.getBytes, 0, bytes, 0, s.getByteLength)
UTF8String.fromBytes(bytes)
}
updater.set(ordinal, str)
case (ENUM, StringType) => (updater, ordinal, value) =>
updater.set(ordinal, UTF8String.fromString(value.toString))
case (FIXED, BinaryType) => (updater, ordinal, value) =>
updater.set(ordinal, value.asInstanceOf[GenericFixed].bytes().clone())
case (BYTES, BinaryType) => (updater, ordinal, value) =>
val bytes = value match {
case b: ByteBuffer =>
val bytes = new Array[Byte](b.remaining)
b.get(bytes)
// Do not forget to reset the position
b.rewind()
bytes
case b: Array[Byte] => b
case other =>
throw new RuntimeException(errorPrefix + s"$other is not a valid avro binary.")
}
updater.set(ordinal, bytes)
case (FIXED, dt: DecimalType) =>
val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
if (preventReadingIncorrectType &&
d.getPrecision - d.getScale > dt.precision - dt.scale) {
throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
toFieldStr(catalystPath), realDataType.catalogString, dt.catalogString)
}
(updater, ordinal, value) =>
val bigDecimal =
decimalConversions.fromFixed(value.asInstanceOf[GenericFixed], avroType, d)
val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
updater.setDecimal(ordinal, decimal)
case (BYTES, dt: DecimalType) =>
val d = avroType.getLogicalType.asInstanceOf[LogicalTypes.Decimal]
if (preventReadingIncorrectType &&
d.getPrecision - d.getScale > dt.precision - dt.scale) {
throw QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
toFieldStr(catalystPath), realDataType.catalogString, dt.catalogString)
}
(updater, ordinal, value) =>
val bigDecimal = decimalConversions.fromBytes(value.asInstanceOf[ByteBuffer], avroType, d)
val decimal = createDecimal(bigDecimal, d.getPrecision, d.getScale)
updater.setDecimal(ordinal, decimal)
case (RECORD, st: StructType) =>
// Avro datasource doesn't accept filters with nested attributes. See SPARK-32328.
// We can always return `false` from `applyFilters` for nested records.
val writeRecord =
getRecordWriter(avroType, st, avroPath, catalystPath, applyFilters = _ => false)
(updater, ordinal, value) =>
val row = new SpecificInternalRow(st)
writeRecord(new RowUpdater(row), value.asInstanceOf[GenericRecord])
updater.set(ordinal, row)
case (ARRAY, ArrayType(elementType, containsNull)) =>
val avroElementPath = avroPath :+ "element"
val elementWriter = newWriter(avroType.getElementType, elementType,
avroElementPath, catalystPath :+ "element")
(updater, ordinal, value) =>
val collection = value.asInstanceOf[java.util.Collection[Any]]
val result = createArrayData(elementType, collection.size())
val elementUpdater = new ArrayDataUpdater(result)
var i = 0
val iter = collection.iterator()
while (iter.hasNext) {
val element = iter.next()
if (element == null) {
if (!containsNull) {
throw new RuntimeException(
s"Array value at path ${toFieldStr(avroElementPath)} is not allowed to be null")
} else {
elementUpdater.setNullAt(i)
}
} else {
elementWriter(elementUpdater, i, element)
}
i += 1
}
updater.set(ordinal, result)
case (MAP, MapType(keyType, valueType, valueContainsNull)) if keyType == StringType =>
val keyWriter = newWriter(SchemaBuilder.builder().stringType(), StringType,
avroPath :+ "key", catalystPath :+ "key")
val valueWriter = newWriter(avroType.getValueType, valueType,
avroPath :+ "value", catalystPath :+ "value")
(updater, ordinal, value) =>
val map = value.asInstanceOf[java.util.Map[AnyRef, AnyRef]]
val keyArray = createArrayData(keyType, map.size())
val keyUpdater = new ArrayDataUpdater(keyArray)
val valueArray = createArrayData(valueType, map.size())
val valueUpdater = new ArrayDataUpdater(valueArray)
val iter = map.entrySet().iterator()
var i = 0
while (iter.hasNext) {
val entry = iter.next()
assert(entry.getKey != null)
keyWriter(keyUpdater, i, entry.getKey)
if (entry.getValue == null) {
if (!valueContainsNull) {
throw new RuntimeException(
s"Map value at path ${toFieldStr(avroPath :+ "value")} is not allowed to be null")
} else {
valueUpdater.setNullAt(i)
}
} else {
valueWriter(valueUpdater, i, entry.getValue)
}
i += 1
}
// The Avro map will never have null or duplicated map keys, it's safe to create a
// ArrayBasedMapData directly here.
updater.set(ordinal, new ArrayBasedMapData(keyArray, valueArray))
case (UNION, _) =>
val nonNullTypes = nonNullUnionBranches(avroType)
val nonNullAvroType = Schema.createUnion(nonNullTypes.asJava)
if (nonNullTypes.nonEmpty) {
if (nonNullTypes.length == 1) {
newWriter(nonNullTypes.head, catalystType, avroPath, catalystPath)
} else {
nonNullTypes.map(_.getType) match {
case Seq(a, b) if Set(a, b) == Set(INT, LONG) && catalystType == LongType =>
(updater, ordinal, value) =>
value match {
case null => updater.setNullAt(ordinal)
case l: java.lang.Long => updater.setLong(ordinal, l)
case i: java.lang.Integer => updater.setLong(ordinal, i.longValue())
}
case Seq(a, b) if Set(a, b) == Set(FLOAT, DOUBLE) && catalystType == DoubleType =>
(updater, ordinal, value) =>
value match {
case null => updater.setNullAt(ordinal)
case d: java.lang.Double => updater.setDouble(ordinal, d)
case f: java.lang.Float => updater.setDouble(ordinal, f.doubleValue())
}
case _ =>
catalystType match {
case st: StructType if st.length == nonNullTypes.size =>
val fieldWriters = nonNullTypes.zip(st.fields).map {
case (schema, field) =>
newWriter(schema, field.dataType, avroPath, catalystPath :+ field.name)
}.toArray
(updater, ordinal, value) => {
val row = new SpecificInternalRow(st)
val fieldUpdater = new RowUpdater(row)
val i = GenericData.get().resolveUnion(nonNullAvroType, value)
fieldWriters(i)(fieldUpdater, i, value)
updater.set(ordinal, row)
}
case _ => throw new IncompatibleSchemaException(incompatibleMsg)
}
}
}
} else {
(updater, ordinal, _) => updater.setNullAt(ordinal)
}
case (INT, _: YearMonthIntervalType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])
case (LONG, _: DayTimeIntervalType) => (updater, ordinal, value) =>
updater.setLong(ordinal, value.asInstanceOf[Long])
case (LONG, _: DecimalType) => (updater, ordinal, value) =>
val d = avroType.getLogicalType.asInstanceOf[CustomDecimal]
updater.setDecimal(ordinal, Decimal(value.asInstanceOf[Long], d.precision, d.scale))
case _ => throw new IncompatibleSchemaException(incompatibleMsg)
}
}