def sparkData2OdpsData()

in spark-connector/common/src/main/scala/org/apache/spark/sql/odps/OdpsUtils.scala [127:210]


  def sparkData2OdpsData(t: TypeInfo): Object => AnyRef = {
    t.getOdpsType match {
      case OdpsType.BOOLEAN => v: Object => v.asInstanceOf[java.lang.Boolean]
      case OdpsType.DOUBLE => v: Object => v.asInstanceOf[java.lang.Double]
      case OdpsType.BIGINT => v: Object => v.asInstanceOf[java.lang.Long]
      case OdpsType.DATETIME => v: Object =>
        if (v != null)  {
          new java.util.Date(Math.floorDiv(v.asInstanceOf[Long], MICROS_PER_MILLIS))
        }
        else null
      case OdpsType.STRING => v: Object =>
        if (v != null) v.asInstanceOf[UTF8String].toString
        else null
      case OdpsType.DECIMAL => v: Object =>
        val ti = t.asInstanceOf[DecimalTypeInfo]
        if (v != null) new BigDecimal(v.asInstanceOf[Decimal].toString).setScale(ti.getScale)
        else null
      case OdpsType.VARCHAR => v: Object =>
        val ti = t.asInstanceOf[VarcharTypeInfo]
        if (v != null) new Varchar(v.asInstanceOf[UTF8String].toString, ti.getLength)
        else null
      case OdpsType.CHAR => v: Object =>
        val ti = t.asInstanceOf[CharTypeInfo]
        if (v != null) new Char(v.asInstanceOf[UTF8String].toString, ti.getLength)
        else null
      case OdpsType.DATE => v: Object =>
        // TODO: use odps-sdk setDateAsLocalDate
        if (v != null) new java.sql.Date(v.asInstanceOf[Int].toLong * SECONDS_PER_DAY)
        else null
      case OdpsType.TIMESTAMP => v: Object =>
        if (v != null) {
          val microSeconds = v.asInstanceOf[Long]
          var seconds = microSeconds / MICROS_PER_SECOND
          var micros = microSeconds % MICROS_PER_SECOND
          if (micros < 0) {
            micros += MICROS_PER_SECOND
            seconds -= 1
          }
          val ts = new Timestamp(seconds * 1000)
          ts.setNanos(micros.toInt * 1000)
          ts
        }
        else null
      case OdpsType.FLOAT => v: Object => v.asInstanceOf[java.lang.Float]
      case OdpsType.INT => v: Object => v.asInstanceOf[java.lang.Integer]
      case OdpsType.SMALLINT => v: Object => v.asInstanceOf[java.lang.Short]
      case OdpsType.TINYINT => v: Object => v.asInstanceOf[java.lang.Byte]
      case OdpsType.ARRAY => v: Object =>
        val ti = t.asInstanceOf[ArrayTypeInfo]
        if (v != null) {
          v.asInstanceOf[org.apache.spark.sql.catalyst.expressions.UnsafeArrayData]
            .toArray[Object](typeInfo2Type(ti.getElementTypeInfo))
            .map(e => sparkData2OdpsData(ti.getElementTypeInfo)(e)).toList.asJava
        } else null
      case OdpsType.BINARY => v: Object => new Binary(v.asInstanceOf[Array[Byte]])
      case OdpsType.MAP => v: Object =>
        val ti = t.asInstanceOf[MapTypeInfo]
        if (v != null) {
          val m = new java.util.HashMap[Object, Object]
          val mapData = v.asInstanceOf[org.apache.spark.sql.catalyst.expressions.UnsafeMapData]
          mapData.keyArray.toArray[Object](typeInfo2Type(ti.getKeyTypeInfo))
            .zip(
              mapData.valueArray.toArray[Object](
                typeInfo2Type(ti.getValueTypeInfo)))
            .foreach(p => m.put(
              sparkData2OdpsData(ti.getKeyTypeInfo)(p._1),
              sparkData2OdpsData(ti.getValueTypeInfo)(p._2)
                .asInstanceOf[Object])
            )
          m
        } else null
      case OdpsType.STRUCT => v: Object => {
        val ti = t.asInstanceOf[StructTypeInfo]
        if (v != null) {
          val r = v.asInstanceOf[org.apache.spark.sql.catalyst.expressions.UnsafeRow]
          val l = (0 until r.numFields).zip(ti.getFieldTypeInfos.toArray()).map(p =>
            sparkData2OdpsData(p._2.asInstanceOf[TypeInfo])(r.get(p._1,
              typeInfo2Type(p._2.asInstanceOf[TypeInfo])))
          ).toList.asJava
          new SimpleStruct(ti, l)
        } else null
      }
    }
  }