in flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java [345:459]
public static Object toFlinkObject(ObjectInspector inspector, Object data, HiveShim hiveShim) {
if (data == null || inspector instanceof VoidObjectInspector) {
return null;
}
if (inspector instanceof PrimitiveObjectInspector) {
if (inspector instanceof BooleanObjectInspector
|| inspector instanceof StringObjectInspector
|| inspector instanceof ByteObjectInspector
|| inspector instanceof ShortObjectInspector
|| inspector instanceof IntObjectInspector
|| inspector instanceof LongObjectInspector
|| inspector instanceof FloatObjectInspector
|| inspector instanceof DoubleObjectInspector
|| inspector instanceof BinaryObjectInspector) {
PrimitiveObjectInspector poi = (PrimitiveObjectInspector) inspector;
return poi.getPrimitiveJavaObject(data);
} else if (inspector instanceof DateObjectInspector) {
PrimitiveObjectInspector poi = (PrimitiveObjectInspector) inspector;
return hiveShim.toFlinkDate(poi.getPrimitiveJavaObject(data));
} else if (inspector instanceof TimestampObjectInspector) {
PrimitiveObjectInspector poi = (PrimitiveObjectInspector) inspector;
return hiveShim.toFlinkTimestamp(poi.getPrimitiveJavaObject(data));
} else if (inspector instanceof HiveCharObjectInspector) {
HiveCharObjectInspector oi = (HiveCharObjectInspector) inspector;
return oi.getPrimitiveJavaObject(data).getValue();
} else if (inspector instanceof HiveVarcharObjectInspector) {
HiveVarcharObjectInspector oi = (HiveVarcharObjectInspector) inspector;
return oi.getPrimitiveJavaObject(data).getValue();
} else if (inspector instanceof HiveDecimalObjectInspector) {
HiveDecimalObjectInspector oi = (HiveDecimalObjectInspector) inspector;
return oi.getPrimitiveJavaObject(data).bigDecimalValue();
} else if (inspector instanceof HiveIntervalYearMonthObjectInspector) {
HiveIntervalYearMonthObjectInspector oi =
(HiveIntervalYearMonthObjectInspector) inspector;
HiveIntervalYearMonth hiveIntervalYearMonth = oi.getPrimitiveJavaObject(data);
return Period.of(
hiveIntervalYearMonth.getYears(), hiveIntervalYearMonth.getMonths(), 0);
} else if (inspector instanceof HiveIntervalDayTimeObjectInspector) {
HiveIntervalDayTimeObjectInspector oi =
(HiveIntervalDayTimeObjectInspector) inspector;
HiveIntervalDayTime hiveIntervalDayTime = oi.getPrimitiveJavaObject(data);
return Duration.ofSeconds(
hiveIntervalDayTime.getTotalSeconds(), hiveIntervalDayTime.getNanos());
}
}
if (inspector instanceof ListObjectInspector) {
ListObjectInspector listInspector = (ListObjectInspector) inspector;
List<?> list = listInspector.getList(data);
if (list == null) {
return null;
}
// flink expects a specific array type (e.g. Integer[] instead of Object[]), so we have
// to get the element class
ObjectInspector elementInspector = listInspector.getListElementObjectInspector();
Object[] result =
(Object[])
Array.newInstance(
HiveTypeUtil.toFlinkType(elementInspector).getConversionClass(),
list.size());
for (int i = 0; i < list.size(); i++) {
result[i] = toFlinkObject(elementInspector, list.get(i), hiveShim);
}
return result;
}
if (inspector instanceof MapObjectInspector) {
MapObjectInspector mapInspector = (MapObjectInspector) inspector;
Map<?, ?> map = mapInspector.getMap(data);
if (map == null) {
return null;
}
Map<Object, Object> result = CollectionUtil.newHashMapWithExpectedSize(map.size());
for (Map.Entry<?, ?> entry : map.entrySet()) {
result.put(
toFlinkObject(
mapInspector.getMapKeyObjectInspector(), entry.getKey(), hiveShim),
toFlinkObject(
mapInspector.getMapValueObjectInspector(),
entry.getValue(),
hiveShim));
}
return result;
}
if (inspector instanceof StructObjectInspector) {
StructObjectInspector structInspector = (StructObjectInspector) inspector;
List<? extends StructField> fields = structInspector.getAllStructFieldRefs();
Row row = new Row(fields.size());
// StandardStructObjectInspector.getStructFieldData in Hive-1.2.1 only accepts array or
// list as data
if (!data.getClass().isArray()
&& !(data instanceof List)
&& (inspector instanceof StandardStructObjectInspector)) {
data = new Object[] {data};
}
for (int i = 0; i < row.getArity(); i++) {
row.setField(
i,
toFlinkObject(
fields.get(i).getFieldObjectInspector(),
structInspector.getStructFieldData(data, fields.get(i)),
hiveShim));
}
return row;
}
throw new FlinkHiveUDFException(
String.format("Unwrap does not support ObjectInspector '%s' yet", inspector));
}