in flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java [191:342]
public static HiveObjectConversion getConversion(
ObjectInspector inspector, LogicalType dataType, HiveShim hiveShim) {
if (inspector instanceof PrimitiveObjectInspector) {
HiveObjectConversion conversion;
if (inspector instanceof BooleanObjectInspector
|| inspector instanceof StringObjectInspector
|| inspector instanceof ByteObjectInspector
|| inspector instanceof ShortObjectInspector
|| inspector instanceof IntObjectInspector
|| inspector instanceof LongObjectInspector
|| inspector instanceof FloatObjectInspector
|| inspector instanceof DoubleObjectInspector
|| inspector instanceof BinaryObjectInspector
|| inspector instanceof VoidObjectInspector) {
conversion = IdentityConversion.INSTANCE;
} else if (inspector instanceof DateObjectInspector) {
conversion = hiveShim::toHiveDate;
} else if (inspector instanceof TimestampObjectInspector) {
conversion = hiveShim::toHiveTimestamp;
} else if (inspector instanceof HiveCharObjectInspector) {
conversion =
o ->
o == null
? null
: new HiveChar(
(String) o, ((CharType) dataType).getLength());
} else if (inspector instanceof HiveVarcharObjectInspector) {
conversion =
o ->
o == null
? null
: new HiveVarchar(
(String) o, ((VarCharType) dataType).getLength());
} else if (inspector instanceof HiveDecimalObjectInspector) {
conversion = o -> o == null ? null : HiveDecimal.create((BigDecimal) o);
} else if (inspector instanceof HiveIntervalYearMonthObjectInspector) {
conversion =
o -> {
if (o == null) {
return null;
} else {
Period period = (Period) o;
return new HiveIntervalYearMonth(
period.getYears(), period.getMonths());
}
};
} else if (inspector instanceof HiveIntervalDayTimeObjectInspector) {
conversion =
o -> {
if (o == null) {
return null;
} else {
Duration duration = (Duration) o;
return new HiveIntervalDayTime(
duration.getSeconds(), duration.getNano());
}
};
} else {
throw new FlinkHiveUDFException(
"Unsupported primitive object inspector " + inspector.getClass().getName());
}
// if the object inspector prefers Writable objects, we should add an extra conversion
// for that
// currently this happens for constant arguments for UDFs
if (((PrimitiveObjectInspector) inspector).preferWritable()) {
conversion = new WritableHiveObjectConversion(conversion, hiveShim);
}
return conversion;
}
if (inspector instanceof ListObjectInspector) {
HiveObjectConversion eleConvert =
getConversion(
((ListObjectInspector) inspector).getListElementObjectInspector(),
((ArrayType) dataType).getElementType(),
hiveShim);
return o -> {
if (o == null) {
return null;
}
Object[] array = (Object[]) o;
List<Object> result = new ArrayList<>();
for (Object ele : array) {
result.add(eleConvert.toHiveObject(ele));
}
return result;
};
}
if (inspector instanceof MapObjectInspector) {
MapObjectInspector mapInspector = (MapObjectInspector) inspector;
MapType kvType = (MapType) dataType;
HiveObjectConversion keyConversion =
getConversion(
mapInspector.getMapKeyObjectInspector(), kvType.getKeyType(), hiveShim);
HiveObjectConversion valueConversion =
getConversion(
mapInspector.getMapValueObjectInspector(),
kvType.getValueType(),
hiveShim);
return o -> {
if (o == null) {
return null;
}
Map<Object, Object> map = (Map) o;
Map<Object, Object> result = CollectionUtil.newHashMapWithExpectedSize(map.size());
for (Map.Entry<Object, Object> entry : map.entrySet()) {
result.put(
keyConversion.toHiveObject(entry.getKey()),
valueConversion.toHiveObject(entry.getValue()));
}
return result;
};
}
if (inspector instanceof StructObjectInspector) {
StructObjectInspector structInspector = (StructObjectInspector) inspector;
List<? extends StructField> structFields = structInspector.getAllStructFieldRefs();
List<RowType.RowField> rowFields = ((RowType) dataType).getFields();
HiveObjectConversion[] conversions = new HiveObjectConversion[structFields.size()];
for (int i = 0; i < structFields.size(); i++) {
conversions[i] =
getConversion(
structFields.get(i).getFieldObjectInspector(),
rowFields.get(i).getType(),
hiveShim);
}
return o -> {
if (o == null) {
return null;
}
Row row = (Row) o;
List<Object> result = new ArrayList<>(row.getArity());
for (int i = 0; i < row.getArity(); i++) {
result.add(conversions[i].toHiveObject(row.getField(i)));
}
return result;
};
}
throw new FlinkHiveUDFException(
String.format(
"Flink doesn't support convert object conversion for %s yet", inspector));
}