in spark-connector/common/src/main/scala/org/apache/spark/sql/odps/OdpsUtils.scala [40:124]
private def nullSafeEval(func: Object => Any): Object => Any =
(v: Object) => if (v ne null) func(v) else null
// convert from Odps DataType to Spark DataType
def odpsData2SparkData(t: TypeInfo): Object => Any = {
val func = t.getOdpsType match {
case OdpsType.BOOLEAN => (v: Object) => v.asInstanceOf[java.lang.Boolean]
case OdpsType.DOUBLE => (v: Object) => v.asInstanceOf[java.lang.Double]
case OdpsType.BIGINT => (v: Object) => v.asInstanceOf[java.lang.Long]
case OdpsType.STRING => (v: Object) => v match {
case str: String =>
UTF8String.fromString(str)
case array: Array[Byte] =>
UTF8String.fromBytes(array)
}
case OdpsType.DECIMAL => (v: Object) => {
val ti = t.asInstanceOf[DecimalTypeInfo]
if (ti.getPrecision == 54 && ti.getScale == 18) {
(new Decimal).set(v.asInstanceOf[java.math.BigDecimal], ODPS_DECIMAL_DEFAULT_PRECISION, ODPS_DECIMAL_DEFAULT_SCALE)
} else {
(new Decimal).set(v.asInstanceOf[java.math.BigDecimal], ti.getPrecision, ti.getScale)
}
}
case OdpsType.VARCHAR => (v: Object) => {
val varchar = v.asInstanceOf[Varchar]
UTF8String.fromString(varchar.getValue.substring(0, varchar.length()))
}
case OdpsType.CHAR => (v: Object) => {
val char = v.asInstanceOf[Char]
UTF8String.fromString(char.getValue.substring(0, char.length())).trimRight()
}
case OdpsType.DATE => (v: Object) => {
v match {
case date: LocalDate =>
DateTimeUtils.localDateToDays(date)
case _ =>
DateUtils.getDayOffset(v.asInstanceOf[Date]).toInt
}
}
case OdpsType.DATETIME => (v: Object) => {
v match {
case zt : java.time.ZonedDateTime =>
val instant = Instant.from(zt)
DateTimeUtils.instantToMicros(instant)
case _ =>
Math.multiplyExact(v.asInstanceOf[java.util.Date].getTime, MICROS_PER_MILLIS)
}
}
case OdpsType.TIMESTAMP => (v: Object) => {
v match {
case ts : java.time.Instant =>
DateTimeUtils.instantToMicros(ts)
case _ =>
DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[java.sql.Timestamp])
}
}
case OdpsType.FLOAT => (v: Object) => v.asInstanceOf[java.lang.Float]
case OdpsType.INT => (v: Object) => v.asInstanceOf[java.lang.Integer]
case OdpsType.SMALLINT => (v: Object) => v.asInstanceOf[java.lang.Short]
case OdpsType.TINYINT => (v: Object) => v.asInstanceOf[java.lang.Byte]
case OdpsType.ARRAY => (v: Object) => {
val array = v.asInstanceOf[java.util.ArrayList[Object]]
new GenericArrayData(array.toArray().
map(odpsData2SparkData(t.asInstanceOf[ArrayTypeInfo].getElementTypeInfo)(_)))
}
case OdpsType.BINARY => (v: Object) => v.asInstanceOf[Binary].data()
case OdpsType.MAP => (v: Object) => {
val m = v.asInstanceOf[java.util.HashMap[Object, Object]]
val keyArray = m.keySet().toArray()
new ArrayBasedMapData(
new GenericArrayData(keyArray.
map(odpsData2SparkData(t.asInstanceOf[MapTypeInfo].getKeyTypeInfo)(_))),
new GenericArrayData(keyArray.map(m.get(_)).
map(odpsData2SparkData(t.asInstanceOf[MapTypeInfo].getValueTypeInfo)(_)))
)
}
case OdpsType.STRUCT => (v: Object) => {
val struct = v.asInstanceOf[com.aliyun.odps.data.Struct]
org.apache.spark.sql.catalyst.InternalRow
.fromSeq(struct.getFieldValues.asScala.zipWithIndex
.map(x => odpsData2SparkData(struct.getFieldTypeInfo(x._2))(x._1)))
}
}
nullSafeEval(func)
}