protected[odps] def sparkData2OdpsData()

in spark-datasource-v2.3/src/main/scala/org/apache/spark/sql/odps/converter/TypesConverter.scala [245:314]


  protected[odps] def sparkData2OdpsData(t: TypeInfo): Any => AnyRef = {
    t.getOdpsType match {
      case OdpsType.BOOLEAN => v: Any => v.asInstanceOf[java.lang.Boolean]
      case OdpsType.DOUBLE => v: Any => v.asInstanceOf[java.lang.Double]
      case OdpsType.BIGINT => v: Any => v.asInstanceOf[java.lang.Long]
      case OdpsType.DATETIME => v: Any =>
        if (v != null) new java.util.Date(v.asInstanceOf[Long]/1000)
        else null
      case OdpsType.STRING => v: Any =>
        if (v != null) v.asInstanceOf[String]
        else null
      case OdpsType.DECIMAL => v: Any =>
        val ti = t.asInstanceOf[DecimalTypeInfo]
        if (v != null) new BigDecimal(v.asInstanceOf[Decimal].toString).setScale(ti.getScale)
        else null
      case OdpsType.VARCHAR => v: Any =>
        val ti = t.asInstanceOf[VarcharTypeInfo]
        if (v != null) new Varchar(v.asInstanceOf[String].toString, ti.getLength)
        else null
      case OdpsType.CHAR => v: Any =>
        val ti = t.asInstanceOf[CharTypeInfo]
        if (v != null) new Char(v.asInstanceOf[String].toString, ti.getLength)
        else null
      case OdpsType.DATE => v: Any =>
        if (v != null) new java.sql.Date(v.asInstanceOf[Int].toLong * (3600 * 24 * 1000))
        else null
      case OdpsType.TIMESTAMP => v: Any =>
        if (v != null) new java.sql.Timestamp(v.asInstanceOf[Long]/1000)
        else null
      case OdpsType.FLOAT => v: Any => v.asInstanceOf[java.lang.Float]
      case OdpsType.INT => v: Any => v.asInstanceOf[java.lang.Integer]
      case OdpsType.SMALLINT => v: Any => v.asInstanceOf[java.lang.Short]
      case OdpsType.TINYINT => v: Any => v.asInstanceOf[java.lang.Byte]
      case OdpsType.ARRAY => v: Any =>
        val ti = t.asInstanceOf[ArrayTypeInfo]
        if (v != null) {
          v.asInstanceOf[org.apache.spark.sql.catalyst.expressions.UnsafeArrayData]
            .toArray[Object](odpsType2SparkType(ti.getElementTypeInfo))
            .map(e => sparkData2OdpsData(ti.getElementTypeInfo)(e)).toList.asJava
        } else null
      case OdpsType.BINARY => v: Any => new Binary(v.asInstanceOf[Array[Byte]])
      case OdpsType.MAP => v: Any =>
        val ti = t.asInstanceOf[MapTypeInfo]
        if (v != null) {
          val m = new java.util.HashMap[Object, Object]
          val mapData = v.asInstanceOf[org.apache.spark.sql.catalyst.expressions.UnsafeMapData]
          mapData.keyArray.toArray[Object](odpsType2SparkType(ti.getKeyTypeInfo))
            .zip(
              mapData.valueArray.toArray[Object](
                odpsType2SparkType(ti.getValueTypeInfo)))
            .foreach(p => m.put(
              sparkData2OdpsData(ti.getKeyTypeInfo)(p._1.asInstanceOf[Object]),
              sparkData2OdpsData(ti.getValueTypeInfo)(p._2.asInstanceOf[Object])
                .asInstanceOf[Object])
            )
          m
        } else null
      case OdpsType.STRUCT => v: Any => {
        val ti = t.asInstanceOf[StructTypeInfo]
        if (v != null) {
          val r = v.asInstanceOf[org.apache.spark.sql.catalyst.expressions.UnsafeRow]
          val l = (0 until r.numFields).zip(ti.getFieldTypeInfos.toArray()).map(p =>
            sparkData2OdpsData(p._2.asInstanceOf[TypeInfo])(r.get(p._1,
              odpsType2SparkType(p._2.asInstanceOf[TypeInfo])))
          ).toList.asJava
          new SimpleStruct(ti, l)
        } else null
      }
    }
  }