private def nullSafeEval()

in spark-connector/common/src/main/scala/org/apache/spark/sql/odps/OdpsUtils.scala [40:124]


  private def nullSafeEval(func: Object => Any): Object => Any =
    (v: Object) => if (v ne null) func(v) else null

  // convert from Odps DataType to Spark DataType
  def odpsData2SparkData(t: TypeInfo): Object => Any = {
    val func = t.getOdpsType match {
      case OdpsType.BOOLEAN => (v: Object) => v.asInstanceOf[java.lang.Boolean]
      case OdpsType.DOUBLE => (v: Object) => v.asInstanceOf[java.lang.Double]
      case OdpsType.BIGINT => (v: Object) => v.asInstanceOf[java.lang.Long]
      case OdpsType.STRING => (v: Object) => v match {
        case str: String =>
          UTF8String.fromString(str)
        case array: Array[Byte] =>
          UTF8String.fromBytes(array)
      }
      case OdpsType.DECIMAL => (v: Object) => {
        val ti = t.asInstanceOf[DecimalTypeInfo]
        if (ti.getPrecision == 54 && ti.getScale == 18) {
          (new Decimal).set(v.asInstanceOf[java.math.BigDecimal], ODPS_DECIMAL_DEFAULT_PRECISION, ODPS_DECIMAL_DEFAULT_SCALE)
        } else {
          (new Decimal).set(v.asInstanceOf[java.math.BigDecimal], ti.getPrecision, ti.getScale)
        }
      }
      case OdpsType.VARCHAR => (v: Object) => {
        val varchar = v.asInstanceOf[Varchar]
        UTF8String.fromString(varchar.getValue.substring(0, varchar.length()))
      }
      case OdpsType.CHAR => (v: Object) => {
        val char = v.asInstanceOf[Char]
        UTF8String.fromString(char.getValue.substring(0, char.length())).trimRight()
      }
      case OdpsType.DATE => (v: Object) => {
        v match {
          case date: LocalDate =>
            DateTimeUtils.localDateToDays(date)
          case _ =>
            DateUtils.getDayOffset(v.asInstanceOf[Date]).toInt
        }
      }
      case OdpsType.DATETIME => (v: Object) => {
        v match {
          case zt : java.time.ZonedDateTime =>
            val instant = Instant.from(zt)
            DateTimeUtils.instantToMicros(instant)
          case _ =>
            Math.multiplyExact(v.asInstanceOf[java.util.Date].getTime, MICROS_PER_MILLIS)
        }
      }
      case OdpsType.TIMESTAMP => (v: Object) => {
        v match {
          case ts : java.time.Instant =>
            DateTimeUtils.instantToMicros(ts)
          case _ =>
            DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[java.sql.Timestamp])
        }
      }
      case OdpsType.FLOAT => (v: Object) => v.asInstanceOf[java.lang.Float]
      case OdpsType.INT => (v: Object) => v.asInstanceOf[java.lang.Integer]
      case OdpsType.SMALLINT => (v: Object) => v.asInstanceOf[java.lang.Short]
      case OdpsType.TINYINT => (v: Object) => v.asInstanceOf[java.lang.Byte]
      case OdpsType.ARRAY => (v: Object) => {
        val array = v.asInstanceOf[java.util.ArrayList[Object]]
        new GenericArrayData(array.toArray().
          map(odpsData2SparkData(t.asInstanceOf[ArrayTypeInfo].getElementTypeInfo)(_)))
      }
      case OdpsType.BINARY => (v: Object) => v.asInstanceOf[Binary].data()
      case OdpsType.MAP => (v: Object) => {
        val m = v.asInstanceOf[java.util.HashMap[Object, Object]]
        val keyArray = m.keySet().toArray()
        new ArrayBasedMapData(
          new GenericArrayData(keyArray.
            map(odpsData2SparkData(t.asInstanceOf[MapTypeInfo].getKeyTypeInfo)(_))),
          new GenericArrayData(keyArray.map(m.get(_)).
            map(odpsData2SparkData(t.asInstanceOf[MapTypeInfo].getValueTypeInfo)(_)))
        )
      }
      case OdpsType.STRUCT => (v: Object) => {
        val struct = v.asInstanceOf[com.aliyun.odps.data.Struct]
        org.apache.spark.sql.catalyst.InternalRow
          .fromSeq(struct.getFieldValues.asScala.zipWithIndex
            .map(x => odpsData2SparkData(struct.getFieldTypeInfo(x._2))(x._1)))
      }
    }
    nullSafeEval(func)
  }