in flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java [206:488]
private static Function<Object, Object> converter(LogicalType logicalType) {
if (logicalType instanceof NullType) {
return n -> null;
}
if (logicalType instanceof BooleanType) {
return b -> b instanceof Boolean ? b : null;
}
if (logicalType instanceof TinyIntType) {
return c -> {
if (c instanceof Byte) {
return c;
}
if (c instanceof Short) {
return ((Short) c).byteValue();
}
if (c instanceof Integer) {
return ((Integer) c).byteValue();
}
if (c instanceof Long) {
return ((Long) c).byteValue();
}
return null;
};
}
if (logicalType instanceof SmallIntType) {
return c -> {
if (c instanceof Byte) {
return ((Byte) c).shortValue();
}
if (c instanceof Short) {
return c;
}
if (c instanceof Integer) {
return ((Integer) c).shortValue();
}
if (c instanceof Long) {
return ((Long) c).shortValue();
}
return null;
};
}
if (logicalType instanceof IntType) {
return c -> {
if (c instanceof Byte) {
return ((Byte) c).intValue();
}
if (c instanceof Short) {
return ((Short) c).intValue();
}
if (c instanceof Integer) {
return c;
}
if (c instanceof Long) {
return ((Long) c).intValue();
}
return null;
};
}
if (logicalType instanceof BigIntType) {
return c -> {
if (c instanceof Byte) {
return ((Byte) c).longValue();
}
if (c instanceof Short) {
return ((Short) c).longValue();
}
if (c instanceof Integer) {
return ((Integer) c).longValue();
}
if (c instanceof Long) {
return c;
}
return null;
};
}
if (logicalType instanceof FloatType) {
return c -> {
if (c instanceof Float) {
return c;
}
if (c instanceof Double) {
return ((Double) c).floatValue();
}
return null;
};
}
if (logicalType instanceof DoubleType) {
return c -> {
if (c instanceof Float) {
return ((Float) c).doubleValue();
}
if (c instanceof Double) {
return c;
}
return null;
};
}
if (logicalType instanceof DecimalType) {
int precision = ((DecimalType) logicalType).getPrecision();
int scale = ((DecimalType) logicalType).getScale();
return c ->
c instanceof BigDecimal
? DecimalData.fromBigDecimal((BigDecimal) c, precision, scale)
: null;
}
if (logicalType instanceof DateType) {
return c -> {
if (c instanceof Integer) {
return (Integer) c;
}
return null;
};
}
if (logicalType instanceof TimeType) {
return c -> {
if (c instanceof Integer || c instanceof Long) {
long millisLocal = ((Number) c).longValue() / 1000;
long millisUtc = millisLocal + getOffsetFromLocalMillis(millisLocal);
return (int) millisUtc;
}
return null;
};
}
if (logicalType instanceof TimestampType) {
return c ->
c instanceof Integer || c instanceof Long
? TimestampData.fromLocalDateTime(
Instant.ofEpochMilli(((Number) c).longValue() / 1000)
.atZone(ZoneId.systemDefault())
.toLocalDateTime())
: null;
}
if (logicalType instanceof ZonedTimestampType) {
return c ->
c instanceof Integer || c instanceof Long
? TimestampData.fromInstant(
Instant.ofEpochMilli(((Number) c).longValue() / 1000))
: null;
}
if (logicalType instanceof LocalZonedTimestampType) {
return c ->
c instanceof Integer || c instanceof Long
? TimestampData.fromInstant(
Instant.ofEpochMilli(((Number) c).longValue() / 1000))
: null;
}
if (logicalType instanceof DayTimeIntervalType) {
return c ->
c instanceof Integer || c instanceof Long
? ((Number) c).longValue() / 1000
: null;
}
if (logicalType instanceof YearMonthIntervalType) {
return c ->
c instanceof Integer || c instanceof Long
? ((Number) c).longValue() / 1000
: null;
}
if (logicalType instanceof CharType || logicalType instanceof VarCharType) {
return c -> c != null ? StringData.fromString(c.toString()) : null;
}
if (logicalType instanceof BinaryType || logicalType instanceof VarBinaryType) {
return c -> {
if (c instanceof String) {
return ((String) c).getBytes(StandardCharsets.UTF_8);
}
if (c instanceof byte[]) {
return c;
}
return null;
};
}
if (logicalType instanceof ArrayType) {
LogicalType elementType = ((ArrayType) logicalType).getElementType();
Function<Object, Object> elementConverter = converter(elementType);
BiFunction<Integer, Function<Integer, Object>, Object> arrayConstructor =
arrayConstructor(elementType);
return c -> {
int length = -1;
Function<Integer, Object> elementGetter = null;
if (c instanceof List) {
length = ((List<?>) c).size();
elementGetter = i -> elementConverter.apply(((List<?>) c).get(i));
}
if (c != null && c.getClass().isArray()) {
length = Array.getLength(c);
elementGetter = i -> elementConverter.apply(Array.get(c, i));
}
if (elementGetter != null) {
return arrayConstructor.apply(length, elementGetter);
}
return null;
};
}
if (logicalType instanceof MultisetType) {
return c -> c;
}
if (logicalType instanceof MapType) {
Function<Object, Object> keyConverter = converter(((MapType) logicalType).getKeyType());
Function<Object, Object> valueConverter =
converter(((MapType) logicalType).getValueType());
return c -> {
if (c instanceof Map) {
Map<?, ?> mapData =
((Map<?, ?>) c)
.entrySet().stream()
.collect(
Collectors.toMap(
e -> keyConverter.apply(e.getKey()),
e ->
valueConverter.apply(
e.getValue())));
return new GenericMapData(mapData);
} else {
return null;
}
};
}
if (logicalType instanceof RowType) {
LogicalType[] fieldTypes = logicalType.getChildren().toArray(new LogicalType[0]);
List<Function<Object, Object>> fieldConverters =
Arrays.stream(fieldTypes)
.map(PythonTableUtils::converter)
.collect(Collectors.toList());
return c -> {
if (c != null && c.getClass().isArray()) {
int length = Array.getLength(c);
if (length - 1 != fieldTypes.length) {
throw new IllegalStateException(
"Input row doesn't have expected number of values required by the schema. "
+ fieldTypes.length
+ " fields are required while "
+ (length - 1)
+ " values are provided.");
}
GenericRowData row = new GenericRowData(length - 1);
row.setRowKind(RowKind.fromByteValue(((Number) Array.get(c, 0)).byteValue()));
for (int i = 0; i < row.getArity(); i++) {
row.setField(i, fieldConverters.get(i).apply(Array.get(c, i + 1)));
}
return row;
}
return null;
};
} else if (logicalType instanceof StructuredType) {
Optional<Class<?>> implClass = ((StructuredType) logicalType).getImplementationClass();
if (implClass.isPresent()
&& (implClass.get() == ListView.class || implClass.get() == MapView.class)) {
return converter(logicalType.getChildren().get(0));
}
throw new IllegalStateException(
"Failed to get the data converter for StructuredType with implementation "
+ "class: "
+ implClass.orElse(null));
}
throw new IllegalStateException("Failed to get converter for LogicalType: " + logicalType);
}