protected def wrapperFor()

in spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala [268:440]


  protected def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any = oi match {
    case _ if dataType.isInstanceOf[UserDefinedType[_]] =>
      val sqlType = dataType.asInstanceOf[UserDefinedType[_]].sqlType
      wrapperFor(oi, sqlType)
    case x: ConstantObjectInspector =>
      (o: Any) =>
        x.getWritableConstantValue
    case x: PrimitiveObjectInspector => x match {
      // TODO we don't support the HiveVarcharObjectInspector yet.
      case _: StringObjectInspector if x.preferWritable() =>
        withNullSafe(o => getStringWritable(o))
      case _: StringObjectInspector =>
        withNullSafe(o => o.asInstanceOf[UTF8String].toString())
      case _: IntObjectInspector if x.preferWritable() =>
        withNullSafe(o => getIntWritable(o))
      case _: IntObjectInspector =>
        withNullSafe(o => o.asInstanceOf[java.lang.Integer])
      case _: BooleanObjectInspector if x.preferWritable() =>
        withNullSafe(o => getBooleanWritable(o))
      case _: BooleanObjectInspector =>
        withNullSafe(o => o.asInstanceOf[java.lang.Boolean])
      case _: FloatObjectInspector if x.preferWritable() =>
        withNullSafe(o => getFloatWritable(o))
      case _: FloatObjectInspector =>
        withNullSafe(o => o.asInstanceOf[java.lang.Float])
      case _: DoubleObjectInspector if x.preferWritable() =>
        withNullSafe(o => getDoubleWritable(o))
      case _: DoubleObjectInspector =>
        withNullSafe(o => o.asInstanceOf[java.lang.Double])
      case _: LongObjectInspector if x.preferWritable() =>
        withNullSafe(o => getLongWritable(o))
      case _: LongObjectInspector =>
        withNullSafe(o => o.asInstanceOf[java.lang.Long])
      case _: ShortObjectInspector if x.preferWritable() =>
        withNullSafe(o => getShortWritable(o))
      case _: ShortObjectInspector =>
        withNullSafe(o => o.asInstanceOf[java.lang.Short])
      case _: ByteObjectInspector if x.preferWritable() =>
        withNullSafe(o => getByteWritable(o))
      case _: ByteObjectInspector =>
        withNullSafe(o => o.asInstanceOf[java.lang.Byte])
        // To spark HiveVarchar and HiveChar are same as string
      case _: HiveVarcharObjectInspector if x.preferWritable() =>
        withNullSafe(o => getStringWritable(o))
      case _: HiveVarcharObjectInspector =>
        withNullSafe { o =>
            val s = o.asInstanceOf[UTF8String].toString
            new HiveVarchar(s, s.length)
        }
      case _: HiveCharObjectInspector if x.preferWritable() =>
        withNullSafe(o => getStringWritable(o))
      case _: HiveCharObjectInspector =>
        withNullSafe { o =>
            val s = o.asInstanceOf[UTF8String].toString
            new HiveChar(s, s.length)
          }
      case _: JavaHiveDecimalObjectInspector =>
        withNullSafe(o =>
          HiveDecimal.create(o.asInstanceOf[Decimal].toJavaBigDecimal))
      case _: JavaDateObjectInspector =>
        withNullSafe(o =>
            DateTimeUtils.toJavaDate(o.asInstanceOf[Int]))
      case _: JavaTimestampObjectInspector =>
        withNullSafe(o =>
            DateTimeUtils.toJavaTimestamp(o.asInstanceOf[Long]))
      case _: HiveDecimalObjectInspector if x.preferWritable() =>
        withNullSafe(o => getDecimalWritable(o.asInstanceOf[Decimal]))
      case _: HiveDecimalObjectInspector =>
        withNullSafe(o =>
            HiveDecimal.create(o.asInstanceOf[Decimal].toJavaBigDecimal))
      case _: BinaryObjectInspector if x.preferWritable() =>
        withNullSafe(o => getBinaryWritable(o))
      case _: BinaryObjectInspector =>
        withNullSafe(o => o.asInstanceOf[Array[Byte]])
      case _: DateObjectInspector if x.preferWritable() =>
        withNullSafe(o => getDateWritable(o))
      case _: DateObjectInspector =>
        withNullSafe(o => DateTimeUtils.toJavaDate(o.asInstanceOf[Int]))
      case _: TimestampObjectInspector if x.preferWritable() =>
        withNullSafe(o => getTimestampWritable(o))
      case _: TimestampObjectInspector =>
        withNullSafe(o => DateTimeUtils.toJavaTimestamp(o.asInstanceOf[Long]))
      case _: HiveIntervalDayTimeObjectInspector  if x.preferWritable() =>
        withNullSafe(o => getHiveIntervalDayTimeWritable(o))
      case _: HiveIntervalDayTimeObjectInspector =>
        withNullSafe(o => {
          val duration = IntervalUtils.microsToDuration(o.asInstanceOf[Long])
          new HiveIntervalDayTime(duration.getSeconds, duration.getNano)
        })
      case _: HiveIntervalYearMonthObjectInspector if x.preferWritable() =>
        withNullSafe(o => getHiveIntervalYearMonthWritable(o))
      case _: HiveIntervalYearMonthObjectInspector =>
        withNullSafe(o => new HiveIntervalYearMonth(o.asInstanceOf[Int]))
      case _: VoidObjectInspector =>
        (_: Any) => null // always be null for void object inspector
    }

    case soi: StandardStructObjectInspector =>
      val schema = dataType.asInstanceOf[StructType]
      val wrappers = soi.getAllStructFieldRefs.asScala.zip(schema.fields).map {
        case (ref, field) => wrapperFor(ref.getFieldObjectInspector, field.dataType)
      }
      withNullSafe { o =>
        val struct = soi.create()
        val row = o.asInstanceOf[InternalRow]
        soi.getAllStructFieldRefs.asScala.zip(wrappers).zipWithIndex.foreach {
          case ((field, wrapper), i) =>
            soi.setStructFieldData(struct, field, wrapper(row.get(i, schema(i).dataType)))
        }
        struct
      }

    case ssoi: SettableStructObjectInspector =>
      val structType = dataType.asInstanceOf[StructType]
      val wrappers = ssoi.getAllStructFieldRefs.asScala.zip(structType).map {
        case (ref, tpe) => wrapperFor(ref.getFieldObjectInspector, tpe.dataType)
      }
      withNullSafe { o =>
        val row = o.asInstanceOf[InternalRow]
        // 1. create the pojo (most likely) object
        val result = ssoi.create()
        ssoi.getAllStructFieldRefs.asScala.zip(wrappers).zipWithIndex.foreach {
          case ((field, wrapper), i) =>
            val tpe = structType(i).dataType
            ssoi.setStructFieldData(
            result,
            field,
            wrapper(row.get(i, tpe)).asInstanceOf[AnyRef])
        }
        result
      }

    case soi: StructObjectInspector =>
      val structType = dataType.asInstanceOf[StructType]
      val wrappers = soi.getAllStructFieldRefs.asScala.zip(structType).map {
        case (ref, tpe) => wrapperFor(ref.getFieldObjectInspector, tpe.dataType)
      }
      withNullSafe { o =>
        val row = o.asInstanceOf[InternalRow]
        val result = new java.util.ArrayList[AnyRef](wrappers.size)
        soi.getAllStructFieldRefs.asScala.zip(wrappers).zipWithIndex.foreach {
          case ((field, wrapper), i) =>
          val tpe = structType(i).dataType
          result.add(wrapper(row.get(i, tpe)).asInstanceOf[AnyRef])
        }
        result
      }

    case loi: ListObjectInspector =>
      val elementType = dataType.asInstanceOf[ArrayType].elementType
      val wrapper = wrapperFor(loi.getListElementObjectInspector, elementType)
      withNullSafe { o =>
        val array = o.asInstanceOf[ArrayData]
        val values = new java.util.ArrayList[Any](array.numElements())
        array.foreach(elementType, (_, e) => values.add(wrapper(e)))
        values
      }

    case moi: MapObjectInspector =>
      val mt = dataType.asInstanceOf[MapType]
      val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector, mt.keyType)
      val valueWrapper = wrapperFor(moi.getMapValueObjectInspector, mt.valueType)
      withNullSafe { o =>
          val map = o.asInstanceOf[MapData]
          val jmap = new java.util.HashMap[Any, Any](map.numElements())
          map.foreach(mt.keyType, mt.valueType, (k, v) =>
            jmap.put(keyWrapper(k), valueWrapper(v)))
          jmap
        }

    case _ =>
      identity[Any]
  }