in spark-connector/common/src/main/scala/org/apache/spark/sql/odps/OdpsUtils.scala [127:210]
def sparkData2OdpsData(t: TypeInfo): Object => AnyRef = {
t.getOdpsType match {
case OdpsType.BOOLEAN => v: Object => v.asInstanceOf[java.lang.Boolean]
case OdpsType.DOUBLE => v: Object => v.asInstanceOf[java.lang.Double]
case OdpsType.BIGINT => v: Object => v.asInstanceOf[java.lang.Long]
case OdpsType.DATETIME => v: Object =>
if (v != null) {
new java.util.Date(Math.floorDiv(v.asInstanceOf[Long], MICROS_PER_MILLIS))
}
else null
case OdpsType.STRING => v: Object =>
if (v != null) v.asInstanceOf[UTF8String].toString
else null
case OdpsType.DECIMAL => v: Object =>
val ti = t.asInstanceOf[DecimalTypeInfo]
if (v != null) new BigDecimal(v.asInstanceOf[Decimal].toString).setScale(ti.getScale)
else null
case OdpsType.VARCHAR => v: Object =>
val ti = t.asInstanceOf[VarcharTypeInfo]
if (v != null) new Varchar(v.asInstanceOf[UTF8String].toString, ti.getLength)
else null
case OdpsType.CHAR => v: Object =>
val ti = t.asInstanceOf[CharTypeInfo]
if (v != null) new Char(v.asInstanceOf[UTF8String].toString, ti.getLength)
else null
case OdpsType.DATE => v: Object =>
// TODO: use odps-sdk setDateAsLocalDate
if (v != null) new java.sql.Date(v.asInstanceOf[Int].toLong * SECONDS_PER_DAY)
else null
case OdpsType.TIMESTAMP => v: Object =>
if (v != null) {
val microSeconds = v.asInstanceOf[Long]
var seconds = microSeconds / MICROS_PER_SECOND
var micros = microSeconds % MICROS_PER_SECOND
if (micros < 0) {
micros += MICROS_PER_SECOND
seconds -= 1
}
val ts = new Timestamp(seconds * 1000)
ts.setNanos(micros.toInt * 1000)
ts
}
else null
case OdpsType.FLOAT => v: Object => v.asInstanceOf[java.lang.Float]
case OdpsType.INT => v: Object => v.asInstanceOf[java.lang.Integer]
case OdpsType.SMALLINT => v: Object => v.asInstanceOf[java.lang.Short]
case OdpsType.TINYINT => v: Object => v.asInstanceOf[java.lang.Byte]
case OdpsType.ARRAY => v: Object =>
val ti = t.asInstanceOf[ArrayTypeInfo]
if (v != null) {
v.asInstanceOf[org.apache.spark.sql.catalyst.expressions.UnsafeArrayData]
.toArray[Object](typeInfo2Type(ti.getElementTypeInfo))
.map(e => sparkData2OdpsData(ti.getElementTypeInfo)(e)).toList.asJava
} else null
case OdpsType.BINARY => v: Object => new Binary(v.asInstanceOf[Array[Byte]])
case OdpsType.MAP => v: Object =>
val ti = t.asInstanceOf[MapTypeInfo]
if (v != null) {
val m = new java.util.HashMap[Object, Object]
val mapData = v.asInstanceOf[org.apache.spark.sql.catalyst.expressions.UnsafeMapData]
mapData.keyArray.toArray[Object](typeInfo2Type(ti.getKeyTypeInfo))
.zip(
mapData.valueArray.toArray[Object](
typeInfo2Type(ti.getValueTypeInfo)))
.foreach(p => m.put(
sparkData2OdpsData(ti.getKeyTypeInfo)(p._1),
sparkData2OdpsData(ti.getValueTypeInfo)(p._2)
.asInstanceOf[Object])
)
m
} else null
case OdpsType.STRUCT => v: Object => {
val ti = t.asInstanceOf[StructTypeInfo]
if (v != null) {
val r = v.asInstanceOf[org.apache.spark.sql.catalyst.expressions.UnsafeRow]
val l = (0 until r.numFields).zip(ti.getFieldTypeInfos.toArray()).map(p =>
sparkData2OdpsData(p._2.asInstanceOf[TypeInfo])(r.get(p._1,
typeInfo2Type(p._2.asInstanceOf[TypeInfo])))
).toList.asJava
new SimpleStruct(ti, l)
} else null
}
}
}