in spark-datasource-v2.3/src/main/scala/org/apache/spark/sql/odps/converter/TypesConverter.scala [245:314]
protected[odps] def sparkData2OdpsData(t: TypeInfo): Any => AnyRef = {
t.getOdpsType match {
case OdpsType.BOOLEAN => v: Any => v.asInstanceOf[java.lang.Boolean]
case OdpsType.DOUBLE => v: Any => v.asInstanceOf[java.lang.Double]
case OdpsType.BIGINT => v: Any => v.asInstanceOf[java.lang.Long]
case OdpsType.DATETIME => v: Any =>
if (v != null) new java.util.Date(v.asInstanceOf[Long]/1000)
else null
case OdpsType.STRING => v: Any =>
if (v != null) v.asInstanceOf[String]
else null
case OdpsType.DECIMAL => v: Any =>
val ti = t.asInstanceOf[DecimalTypeInfo]
if (v != null) new BigDecimal(v.asInstanceOf[Decimal].toString).setScale(ti.getScale)
else null
case OdpsType.VARCHAR => v: Any =>
val ti = t.asInstanceOf[VarcharTypeInfo]
if (v != null) new Varchar(v.asInstanceOf[String].toString, ti.getLength)
else null
case OdpsType.CHAR => v: Any =>
val ti = t.asInstanceOf[CharTypeInfo]
if (v != null) new Char(v.asInstanceOf[String].toString, ti.getLength)
else null
case OdpsType.DATE => v: Any =>
if (v != null) new java.sql.Date(v.asInstanceOf[Int].toLong * (3600 * 24 * 1000))
else null
case OdpsType.TIMESTAMP => v: Any =>
if (v != null) new java.sql.Timestamp(v.asInstanceOf[Long]/1000)
else null
case OdpsType.FLOAT => v: Any => v.asInstanceOf[java.lang.Float]
case OdpsType.INT => v: Any => v.asInstanceOf[java.lang.Integer]
case OdpsType.SMALLINT => v: Any => v.asInstanceOf[java.lang.Short]
case OdpsType.TINYINT => v: Any => v.asInstanceOf[java.lang.Byte]
case OdpsType.ARRAY => v: Any =>
val ti = t.asInstanceOf[ArrayTypeInfo]
if (v != null) {
v.asInstanceOf[org.apache.spark.sql.catalyst.expressions.UnsafeArrayData]
.toArray[Object](odpsType2SparkType(ti.getElementTypeInfo))
.map(e => sparkData2OdpsData(ti.getElementTypeInfo)(e)).toList.asJava
} else null
case OdpsType.BINARY => v: Any => new Binary(v.asInstanceOf[Array[Byte]])
case OdpsType.MAP => v: Any =>
val ti = t.asInstanceOf[MapTypeInfo]
if (v != null) {
val m = new java.util.HashMap[Object, Object]
val mapData = v.asInstanceOf[org.apache.spark.sql.catalyst.expressions.UnsafeMapData]
mapData.keyArray.toArray[Object](odpsType2SparkType(ti.getKeyTypeInfo))
.zip(
mapData.valueArray.toArray[Object](
odpsType2SparkType(ti.getValueTypeInfo)))
.foreach(p => m.put(
sparkData2OdpsData(ti.getKeyTypeInfo)(p._1.asInstanceOf[Object]),
sparkData2OdpsData(ti.getValueTypeInfo)(p._2.asInstanceOf[Object])
.asInstanceOf[Object])
)
m
} else null
case OdpsType.STRUCT => v: Any => {
val ti = t.asInstanceOf[StructTypeInfo]
if (v != null) {
val r = v.asInstanceOf[org.apache.spark.sql.catalyst.expressions.UnsafeRow]
val l = (0 until r.numFields).zip(ti.getFieldTypeInfos.toArray()).map(p =>
sparkData2OdpsData(p._2.asInstanceOf[TypeInfo])(r.get(p._1,
odpsType2SparkType(p._2.asInstanceOf[TypeInfo])))
).toList.asJava
new SimpleStruct(ti, l)
} else null
}
}
}