in hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala [111:276]
private def newConverter(catalystType: DataType,
avroType: Schema,
catalystPath: Seq[String],
avroPath: Seq[String]): Converter = {
val errorPrefix = s"Cannot convert SQL ${toFieldStr(catalystPath)} " +
s"to Avro ${toFieldStr(avroPath)} because "
(catalystType, avroType.getType) match {
case (NullType, NULL) =>
(getter, ordinal) => null
case (BooleanType, BOOLEAN) =>
(getter, ordinal) => getter.getBoolean(ordinal)
case (ByteType, INT) =>
(getter, ordinal) => getter.getByte(ordinal).toInt
case (ShortType, INT) =>
(getter, ordinal) => getter.getShort(ordinal).toInt
case (IntegerType, INT) =>
(getter, ordinal) => getter.getInt(ordinal)
case (LongType, LONG) =>
(getter, ordinal) => getter.getLong(ordinal)
case (FloatType, FLOAT) =>
(getter, ordinal) => getter.getFloat(ordinal)
case (DoubleType, DOUBLE) =>
(getter, ordinal) => getter.getDouble(ordinal)
case (d: DecimalType, FIXED)
if avroType.getLogicalType == LogicalTypes.decimal(d.precision, d.scale) =>
(getter, ordinal) =>
val decimal = getter.getDecimal(ordinal, d.precision, d.scale)
decimalConversions.toFixed(decimal.toJavaBigDecimal, avroType,
LogicalTypes.decimal(d.precision, d.scale))
case (d: DecimalType, BYTES)
if avroType.getLogicalType == LogicalTypes.decimal(d.precision, d.scale) =>
(getter, ordinal) =>
val decimal = getter.getDecimal(ordinal, d.precision, d.scale)
decimalConversions.toBytes(decimal.toJavaBigDecimal, avroType,
LogicalTypes.decimal(d.precision, d.scale))
case (StringType, ENUM) =>
val enumSymbols: Set[String] = avroType.getEnumSymbols.asScala.toSet
(getter, ordinal) =>
val data = getter.getUTF8String(ordinal).toString
if (!enumSymbols.contains(data)) {
throw new IncompatibleSchemaException(errorPrefix +
s""""$data" cannot be written since it's not defined in enum """ +
enumSymbols.mkString("\"", "\", \"", "\""))
}
new EnumSymbol(avroType, data)
case (StringType, STRING) =>
(getter, ordinal) => new Utf8(getter.getUTF8String(ordinal).getBytes)
case (BinaryType, FIXED) =>
val size = avroType.getFixedSize
(getter, ordinal) =>
val data: Array[Byte] = getter.getBinary(ordinal)
if (data.length != size) {
def len2str(len: Int): String = s"$len ${if (len > 1) "bytes" else "byte"}"
throw new IncompatibleSchemaException(errorPrefix + len2str(data.length) +
" of binary data cannot be written into FIXED type with size of " + len2str(size))
}
new Fixed(avroType, data)
case (BinaryType, BYTES) =>
(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
case (DateType, INT) =>
(getter, ordinal) => dateRebaseFunc(getter.getInt(ordinal))
case (TimestampType, LONG) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not logical type
// (the `null` case), output the timestamp value as with millisecond precision.
case null | _: TimestampMillis => (getter, ordinal) =>
DateTimeUtils.microsToMillis(timestampRebaseFunc(getter.getLong(ordinal)))
case _: TimestampMicros => (getter, ordinal) =>
timestampRebaseFunc(getter.getLong(ordinal))
case other => throw new IncompatibleSchemaException(errorPrefix +
s"SQL type ${TimestampType.sql} cannot be converted to Avro logical type $other")
}
case (TimestampNTZType, LONG) => avroType.getLogicalType match {
// To keep consistent with TimestampType, if the Avro type is Long and it is not
// logical type (the `null` case), output the TimestampNTZ as long value
// in millisecond precision.
case null | _: LocalTimestampMillis => (getter, ordinal) =>
DateTimeUtils.microsToMillis(getter.getLong(ordinal))
case _: LocalTimestampMicros => (getter, ordinal) =>
getter.getLong(ordinal)
case other => throw new IncompatibleSchemaException(errorPrefix +
s"SQL type ${TimestampNTZType.sql} cannot be converted to Avro logical type $other")
}
case (ArrayType(et, containsNull), ARRAY) =>
val elementConverter = newConverter(
et, resolveNullableType(avroType.getElementType, containsNull),
catalystPath :+ "element", avroPath :+ "element")
(getter, ordinal) => {
val arrayData = getter.getArray(ordinal)
val len = arrayData.numElements()
val result = new Array[Any](len)
var i = 0
while (i < len) {
if (containsNull && arrayData.isNullAt(i)) {
result(i) = null
} else {
result(i) = elementConverter(arrayData, i)
}
i += 1
}
// avro writer is expecting a Java Collection, so we convert it into
// `ArrayList` backed by the specified array without data copying.
java.util.Arrays.asList(result: _*)
}
case (st: StructType, RECORD) =>
val structConverter = newStructConverter(st, avroType, catalystPath, avroPath)
val numFields = st.length
(getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))
////////////////////////////////////////////////////////////////////////////////////////////
// Following section is amended to the original (Spark's) implementation
// >>> BEGINS
////////////////////////////////////////////////////////////////////////////////////////////
case (st: StructType, UNION) =>
val unionConverter = newUnionConverter(st, avroType, catalystPath, avroPath)
val numFields = st.length
(getter, ordinal) => unionConverter(getter.getStruct(ordinal, numFields))
////////////////////////////////////////////////////////////////////////////////////////////
// <<< ENDS
////////////////////////////////////////////////////////////////////////////////////////////
case (MapType(kt, vt, valueContainsNull), MAP) if kt == StringType =>
val valueConverter = newConverter(
vt, resolveNullableType(avroType.getValueType, valueContainsNull),
catalystPath :+ "value", avroPath :+ "value")
(getter, ordinal) =>
val mapData = getter.getMap(ordinal)
val len = mapData.numElements()
val result = new java.util.HashMap[String, Any](len)
val keyArray = mapData.keyArray()
val valueArray = mapData.valueArray()
var i = 0
while (i < len) {
val key = keyArray.getUTF8String(i).toString
if (valueContainsNull && valueArray.isNullAt(i)) {
result.put(key, null)
} else {
result.put(key, valueConverter(valueArray, i))
}
i += 1
}
result
case (_: YearMonthIntervalType, INT) =>
(getter, ordinal) => getter.getInt(ordinal)
case (_: DayTimeIntervalType, LONG) =>
(getter, ordinal) => getter.getLong(ordinal)
case _ =>
throw new IncompatibleSchemaException(errorPrefix +
s"schema is incompatible (sqlType = ${catalystType.sql}, avroType = $avroType)")
}
}