in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala [346:417]
def createConverterToAvro(
dataType: DataType,
structName: String,
recordNamespace: String): (Any) => Any = {
dataType match {
case BinaryType =>
(item: Any) =>
item match {
case null => null
case bytes: Array[Byte] => ByteBuffer.wrap(bytes)
}
case ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | StringType |
BooleanType =>
identity
case _: DecimalType => (item: Any) => if (item == null) null else item.toString
case TimestampType =>
(item: Any) => if (item == null) null else item.asInstanceOf[Timestamp].getTime
case ArrayType(elementType, _) =>
val elementConverter = createConverterToAvro(elementType, structName, recordNamespace)
(item: Any) => {
if (item == null) {
null
} else {
val sourceArray = item.asInstanceOf[Seq[Any]]
val sourceArraySize = sourceArray.size
val targetArray = new util.ArrayList[Any](sourceArraySize)
var idx = 0
while (idx < sourceArraySize) {
targetArray.add(elementConverter(sourceArray(idx)))
idx += 1
}
targetArray
}
}
case MapType(StringType, valueType, _) =>
val valueConverter = createConverterToAvro(valueType, structName, recordNamespace)
(item: Any) => {
if (item == null) {
null
} else {
val javaMap = new HashMap[String, Any]()
item.asInstanceOf[Map[String, Any]].foreach {
case (key, value) =>
javaMap.put(key, valueConverter(value))
}
javaMap
}
}
case structType: StructType =>
val builder = SchemaBuilder.record(structName).namespace(recordNamespace)
val schema: Schema =
SchemaConverters.convertStructToAvro(structType, builder, recordNamespace)
val fieldConverters = structType.fields.map(
field => createConverterToAvro(field.dataType, field.name, recordNamespace))
(item: Any) => {
if (item == null) {
null
} else {
val record = new Record(schema)
val convertersIterator = fieldConverters.iterator
val fieldNamesIterator = dataType.asInstanceOf[StructType].fieldNames.iterator
val rowIterator = item.asInstanceOf[Row].toSeq.iterator
while (convertersIterator.hasNext) {
val converter = convertersIterator.next()
record.put(fieldNamesIterator.next(), converter(rowIterator.next()))
}
record
}
}
}
}