in spark-connector/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala [268:440]
protected def wrapperFor(oi: ObjectInspector, dataType: DataType): Any => Any = oi match {
case _ if dataType.isInstanceOf[UserDefinedType[_]] =>
val sqlType = dataType.asInstanceOf[UserDefinedType[_]].sqlType
wrapperFor(oi, sqlType)
case x: ConstantObjectInspector =>
(o: Any) =>
x.getWritableConstantValue
case x: PrimitiveObjectInspector => x match {
// TODO we don't support the HiveVarcharObjectInspector yet.
case _: StringObjectInspector if x.preferWritable() =>
withNullSafe(o => getStringWritable(o))
case _: StringObjectInspector =>
withNullSafe(o => o.asInstanceOf[UTF8String].toString())
case _: IntObjectInspector if x.preferWritable() =>
withNullSafe(o => getIntWritable(o))
case _: IntObjectInspector =>
withNullSafe(o => o.asInstanceOf[java.lang.Integer])
case _: BooleanObjectInspector if x.preferWritable() =>
withNullSafe(o => getBooleanWritable(o))
case _: BooleanObjectInspector =>
withNullSafe(o => o.asInstanceOf[java.lang.Boolean])
case _: FloatObjectInspector if x.preferWritable() =>
withNullSafe(o => getFloatWritable(o))
case _: FloatObjectInspector =>
withNullSafe(o => o.asInstanceOf[java.lang.Float])
case _: DoubleObjectInspector if x.preferWritable() =>
withNullSafe(o => getDoubleWritable(o))
case _: DoubleObjectInspector =>
withNullSafe(o => o.asInstanceOf[java.lang.Double])
case _: LongObjectInspector if x.preferWritable() =>
withNullSafe(o => getLongWritable(o))
case _: LongObjectInspector =>
withNullSafe(o => o.asInstanceOf[java.lang.Long])
case _: ShortObjectInspector if x.preferWritable() =>
withNullSafe(o => getShortWritable(o))
case _: ShortObjectInspector =>
withNullSafe(o => o.asInstanceOf[java.lang.Short])
case _: ByteObjectInspector if x.preferWritable() =>
withNullSafe(o => getByteWritable(o))
case _: ByteObjectInspector =>
withNullSafe(o => o.asInstanceOf[java.lang.Byte])
// To spark HiveVarchar and HiveChar are same as string
case _: HiveVarcharObjectInspector if x.preferWritable() =>
withNullSafe(o => getStringWritable(o))
case _: HiveVarcharObjectInspector =>
withNullSafe { o =>
val s = o.asInstanceOf[UTF8String].toString
new HiveVarchar(s, s.length)
}
case _: HiveCharObjectInspector if x.preferWritable() =>
withNullSafe(o => getStringWritable(o))
case _: HiveCharObjectInspector =>
withNullSafe { o =>
val s = o.asInstanceOf[UTF8String].toString
new HiveChar(s, s.length)
}
case _: JavaHiveDecimalObjectInspector =>
withNullSafe(o =>
HiveDecimal.create(o.asInstanceOf[Decimal].toJavaBigDecimal))
case _: JavaDateObjectInspector =>
withNullSafe(o =>
DateTimeUtils.toJavaDate(o.asInstanceOf[Int]))
case _: JavaTimestampObjectInspector =>
withNullSafe(o =>
DateTimeUtils.toJavaTimestamp(o.asInstanceOf[Long]))
case _: HiveDecimalObjectInspector if x.preferWritable() =>
withNullSafe(o => getDecimalWritable(o.asInstanceOf[Decimal]))
case _: HiveDecimalObjectInspector =>
withNullSafe(o =>
HiveDecimal.create(o.asInstanceOf[Decimal].toJavaBigDecimal))
case _: BinaryObjectInspector if x.preferWritable() =>
withNullSafe(o => getBinaryWritable(o))
case _: BinaryObjectInspector =>
withNullSafe(o => o.asInstanceOf[Array[Byte]])
case _: DateObjectInspector if x.preferWritable() =>
withNullSafe(o => getDateWritable(o))
case _: DateObjectInspector =>
withNullSafe(o => DateTimeUtils.toJavaDate(o.asInstanceOf[Int]))
case _: TimestampObjectInspector if x.preferWritable() =>
withNullSafe(o => getTimestampWritable(o))
case _: TimestampObjectInspector =>
withNullSafe(o => DateTimeUtils.toJavaTimestamp(o.asInstanceOf[Long]))
case _: HiveIntervalDayTimeObjectInspector if x.preferWritable() =>
withNullSafe(o => getHiveIntervalDayTimeWritable(o))
case _: HiveIntervalDayTimeObjectInspector =>
withNullSafe(o => {
val duration = IntervalUtils.microsToDuration(o.asInstanceOf[Long])
new HiveIntervalDayTime(duration.getSeconds, duration.getNano)
})
case _: HiveIntervalYearMonthObjectInspector if x.preferWritable() =>
withNullSafe(o => getHiveIntervalYearMonthWritable(o))
case _: HiveIntervalYearMonthObjectInspector =>
withNullSafe(o => new HiveIntervalYearMonth(o.asInstanceOf[Int]))
case _: VoidObjectInspector =>
(_: Any) => null // always be null for void object inspector
}
case soi: StandardStructObjectInspector =>
val schema = dataType.asInstanceOf[StructType]
val wrappers = soi.getAllStructFieldRefs.asScala.zip(schema.fields).map {
case (ref, field) => wrapperFor(ref.getFieldObjectInspector, field.dataType)
}
withNullSafe { o =>
val struct = soi.create()
val row = o.asInstanceOf[InternalRow]
soi.getAllStructFieldRefs.asScala.zip(wrappers).zipWithIndex.foreach {
case ((field, wrapper), i) =>
soi.setStructFieldData(struct, field, wrapper(row.get(i, schema(i).dataType)))
}
struct
}
case ssoi: SettableStructObjectInspector =>
val structType = dataType.asInstanceOf[StructType]
val wrappers = ssoi.getAllStructFieldRefs.asScala.zip(structType).map {
case (ref, tpe) => wrapperFor(ref.getFieldObjectInspector, tpe.dataType)
}
withNullSafe { o =>
val row = o.asInstanceOf[InternalRow]
// 1. create the pojo (most likely) object
val result = ssoi.create()
ssoi.getAllStructFieldRefs.asScala.zip(wrappers).zipWithIndex.foreach {
case ((field, wrapper), i) =>
val tpe = structType(i).dataType
ssoi.setStructFieldData(
result,
field,
wrapper(row.get(i, tpe)).asInstanceOf[AnyRef])
}
result
}
case soi: StructObjectInspector =>
val structType = dataType.asInstanceOf[StructType]
val wrappers = soi.getAllStructFieldRefs.asScala.zip(structType).map {
case (ref, tpe) => wrapperFor(ref.getFieldObjectInspector, tpe.dataType)
}
withNullSafe { o =>
val row = o.asInstanceOf[InternalRow]
val result = new java.util.ArrayList[AnyRef](wrappers.size)
soi.getAllStructFieldRefs.asScala.zip(wrappers).zipWithIndex.foreach {
case ((field, wrapper), i) =>
val tpe = structType(i).dataType
result.add(wrapper(row.get(i, tpe)).asInstanceOf[AnyRef])
}
result
}
case loi: ListObjectInspector =>
val elementType = dataType.asInstanceOf[ArrayType].elementType
val wrapper = wrapperFor(loi.getListElementObjectInspector, elementType)
withNullSafe { o =>
val array = o.asInstanceOf[ArrayData]
val values = new java.util.ArrayList[Any](array.numElements())
array.foreach(elementType, (_, e) => values.add(wrapper(e)))
values
}
case moi: MapObjectInspector =>
val mt = dataType.asInstanceOf[MapType]
val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector, mt.keyType)
val valueWrapper = wrapperFor(moi.getMapValueObjectInspector, mt.valueType)
withNullSafe { o =>
val map = o.asInstanceOf[MapData]
val jmap = new java.util.HashMap[Any, Any](map.numElements())
map.foreach(mt.keyType, mt.valueType, (k, v) =>
jmap.put(keyWrapper(k), valueWrapper(v)))
jmap
}
case _ =>
identity[Any]
}