in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala [153:247]
def createConverterToSQL(schema: Schema): Any => Any = {
schema.getType match {
// Avro strings are in Utf8, so we have to call toString on them
case STRING | ENUM => (item: Any) => if (item == null) null else item.toString
case INT | BOOLEAN | DOUBLE | FLOAT | LONG => identity
// Byte arrays are reused by avro, so we have to make a copy of them.
case FIXED =>
(item: Any) =>
if (item == null) {
null
} else {
item.asInstanceOf[Fixed].bytes().clone()
}
case BYTES =>
(item: Any) =>
if (item == null) {
null
} else {
val bytes = item.asInstanceOf[ByteBuffer]
val javaBytes = new Array[Byte](bytes.remaining)
bytes.get(javaBytes)
javaBytes
}
case RECORD =>
val fieldConverters = schema.getFields.map(f => createConverterToSQL(f.schema))
(item: Any) =>
if (item == null) {
null
} else {
val record = item.asInstanceOf[GenericRecord]
val converted = new Array[Any](fieldConverters.size)
var idx = 0
while (idx < fieldConverters.size) {
converted(idx) = fieldConverters.apply(idx)(record.get(idx))
idx += 1
}
Row.fromSeq(converted.toSeq)
}
case ARRAY =>
val elementConverter = createConverterToSQL(schema.getElementType)
(item: Any) =>
if (item == null) {
null
} else {
try {
item.asInstanceOf[GenericData.Array[Any]].map(elementConverter)
} catch {
case e: Throwable =>
item.asInstanceOf[util.ArrayList[Any]].map(elementConverter)
}
}
case MAP =>
val valueConverter = createConverterToSQL(schema.getValueType)
(item: Any) =>
if (item == null) {
null
} else {
item
.asInstanceOf[HashMap[Any, Any]]
.map(x => (x._1.toString, valueConverter(x._2)))
.toMap
}
case UNION =>
if (schema.getTypes.exists(_.getType == NULL)) {
val remainingUnionTypes = schema.getTypes.filterNot(_.getType == NULL)
if (remainingUnionTypes.size == 1) {
createConverterToSQL(remainingUnionTypes.get(0))
} else {
createConverterToSQL(Schema.createUnion(remainingUnionTypes))
}
} else
schema.getTypes.map(_.getType) match {
case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
(item: Any) => {
item match {
case l: Long => l
case i: Int => i.toLong
case null => null
}
}
case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
(item: Any) => {
item match {
case d: Double => d
case f: Float => f.toDouble
case null => null
}
}
case other =>
throw new SchemaConversionException(
s"This mix of union types is not supported (see README): $other")
}
case other => throw new SchemaConversionException(s"invalid avro type: $other")
}
}