def createConverterToSQL()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/SchemaConverters.scala [153:247]


  def createConverterToSQL(schema: Schema): Any => Any = {
    schema.getType match {
      // Avro strings are in Utf8, so we have to call toString on them
      case STRING | ENUM => (item: Any) => if (item == null) null else item.toString
      case INT | BOOLEAN | DOUBLE | FLOAT | LONG => identity
      // Byte arrays are reused by avro, so we have to make a copy of them.
      case FIXED =>
        (item: Any) =>
          if (item == null) {
            null
          } else {
            item.asInstanceOf[Fixed].bytes().clone()
          }
      case BYTES =>
        (item: Any) =>
          if (item == null) {
            null
          } else {
            val bytes = item.asInstanceOf[ByteBuffer]
            val javaBytes = new Array[Byte](bytes.remaining)
            bytes.get(javaBytes)
            javaBytes
          }
      case RECORD =>
        val fieldConverters = schema.getFields.map(f => createConverterToSQL(f.schema))
        (item: Any) =>
          if (item == null) {
            null
          } else {
            val record = item.asInstanceOf[GenericRecord]
            val converted = new Array[Any](fieldConverters.size)
            var idx = 0
            while (idx < fieldConverters.size) {
              converted(idx) = fieldConverters.apply(idx)(record.get(idx))
              idx += 1
            }
            Row.fromSeq(converted.toSeq)
          }
      case ARRAY =>
        val elementConverter = createConverterToSQL(schema.getElementType)
        (item: Any) =>
          if (item == null) {
            null
          } else {
            try {
              item.asInstanceOf[GenericData.Array[Any]].map(elementConverter)
            } catch {
              case e: Throwable =>
                item.asInstanceOf[util.ArrayList[Any]].map(elementConverter)
            }
          }
      case MAP =>
        val valueConverter = createConverterToSQL(schema.getValueType)
        (item: Any) =>
          if (item == null) {
            null
          } else {
            item
              .asInstanceOf[HashMap[Any, Any]]
              .map(x => (x._1.toString, valueConverter(x._2)))
              .toMap
          }
      case UNION =>
        if (schema.getTypes.exists(_.getType == NULL)) {
          val remainingUnionTypes = schema.getTypes.filterNot(_.getType == NULL)
          if (remainingUnionTypes.size == 1) {
            createConverterToSQL(remainingUnionTypes.get(0))
          } else {
            createConverterToSQL(Schema.createUnion(remainingUnionTypes))
          }
        } else
          schema.getTypes.map(_.getType) match {
            case Seq(t1, t2) if Set(t1, t2) == Set(INT, LONG) =>
              (item: Any) => {
                item match {
                  case l: Long => l
                  case i: Int => i.toLong
                  case null => null
                }
              }
            case Seq(t1, t2) if Set(t1, t2) == Set(FLOAT, DOUBLE) =>
              (item: Any) => {
                item match {
                  case d: Double => d
                  case f: Float => f.toDouble
                  case null => null
                }
              }
            case other =>
              throw new SchemaConversionException(
                s"This mix of union types is not supported (see README): $other")
          }
      case other => throw new SchemaConversionException(s"invalid avro type: $other")
    }
  }