in flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java [1679:1986]
private static Function<Object, Object> converter(
final TypeInformation<?> dataType, final ExecutionConfig config) {
if (dataType.equals(Types.BOOLEAN)) {
return b -> b instanceof Boolean ? b : null;
}
if (dataType.equals(Types.BYTE)) {
return c -> {
if (c instanceof Byte) {
return c;
}
if (c instanceof Short) {
return ((Short) c).byteValue();
}
if (c instanceof Integer) {
return ((Integer) c).byteValue();
}
if (c instanceof Long) {
return ((Long) c).byteValue();
}
return null;
};
}
if (dataType.equals(Types.SHORT)) {
return c -> {
if (c instanceof Byte) {
return ((Byte) c).shortValue();
}
if (c instanceof Short) {
return c;
}
if (c instanceof Integer) {
return ((Integer) c).shortValue();
}
if (c instanceof Long) {
return ((Long) c).shortValue();
}
return null;
};
}
if (dataType.equals(Types.INT)) {
return c -> {
if (c instanceof Byte) {
return ((Byte) c).intValue();
}
if (c instanceof Short) {
return ((Short) c).intValue();
}
if (c instanceof Integer) {
return c;
}
if (c instanceof Long) {
return ((Long) c).intValue();
}
return null;
};
}
if (dataType.equals(Types.LONG)) {
return c -> {
if (c instanceof Byte) {
return ((Byte) c).longValue();
}
if (c instanceof Short) {
return ((Short) c).longValue();
}
if (c instanceof Integer) {
return ((Integer) c).longValue();
}
if (c instanceof Long) {
return c;
}
return null;
};
}
if (dataType.equals(Types.FLOAT)) {
return c -> {
if (c instanceof Float) {
return c;
}
if (c instanceof Double) {
return ((Double) c).floatValue();
}
return null;
};
}
if (dataType.equals(Types.DOUBLE)) {
return c -> {
if (c instanceof Float) {
return ((Float) c).doubleValue();
}
if (c instanceof Double) {
return c;
}
return null;
};
}
if (dataType.equals(Types.BIG_DEC)) {
return c -> c instanceof BigDecimal ? c : null;
}
if (dataType.equals(Types.SQL_DATE)) {
return c -> {
if (c instanceof Integer) {
long millisLocal = ((Integer) c).longValue() * 86400000;
long millisUtc = millisLocal - getOffsetFromLocalMillis(millisLocal);
return new Date(millisUtc);
}
return null;
};
}
if (dataType.equals(Types.SQL_TIME)) {
return c ->
c instanceof Integer || c instanceof Long
? new Time(((Number) c).longValue() / 1000)
: null;
}
if (dataType.equals(Types.SQL_TIMESTAMP)) {
return c ->
c instanceof Integer || c instanceof Long
? new Timestamp(((Number) c).longValue() / 1000)
: null;
}
if (dataType.equals(Types.LOCAL_DATE)) {
return c -> {
if (c instanceof Integer) {
long millisLocal = ((Integer) c).longValue() * 86400000;
long millisUtc = millisLocal - getOffsetFromLocalMillis(millisLocal);
return Instant.ofEpochMilli(millisUtc)
.atZone(ZoneId.systemDefault())
.toLocalDate();
}
return null;
};
}
if (dataType.equals(Types.LOCAL_TIME)) {
return c ->
c instanceof Integer || c instanceof Long
? Instant.ofEpochMilli(((Number) c).longValue() / 1000)
.atZone(ZoneId.systemDefault())
.toLocalTime()
: null;
}
if (dataType.equals(Types.LOCAL_DATE_TIME)) {
return c ->
c instanceof Integer || c instanceof Long
? Instant.ofEpochMilli(((Number) c).longValue() / 1000)
.atZone(ZoneId.systemDefault())
.toLocalDateTime()
: null;
}
if (dataType.equals(org.apache.flink.api.common.typeinfo.Types.INSTANT)) {
return c ->
c instanceof Integer || c instanceof Long
? Instant.ofEpochMilli(((Number) c).longValue() / 1000)
: null;
}
if (dataType.equals(TimeIntervalTypeInfo.INTERVAL_MILLIS)) {
return c ->
c instanceof Integer || c instanceof Long
? ((Number) c).longValue() / 1000
: null;
}
if (dataType.equals(Types.STRING)) {
return c -> c != null ? c.toString() : null;
}
if (dataType.equals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
return c -> {
if (c instanceof String) {
return ((String) c).getBytes(StandardCharsets.UTF_8);
}
if (c instanceof byte[]) {
return c;
}
return null;
};
}
if (dataType instanceof PrimitiveArrayTypeInfo
|| dataType instanceof BasicArrayTypeInfo
|| dataType instanceof ObjectArrayTypeInfo) {
TypeInformation<?> elementType =
dataType instanceof PrimitiveArrayTypeInfo
? ((PrimitiveArrayTypeInfo<?>) dataType).getComponentType()
: dataType instanceof BasicArrayTypeInfo
? ((BasicArrayTypeInfo<?, ?>) dataType).getComponentInfo()
: ((ObjectArrayTypeInfo<?, ?>) dataType).getComponentInfo();
boolean primitive = dataType instanceof PrimitiveArrayTypeInfo;
Function<Object, Object> elementConverter = converter(elementType, config);
BiFunction<Integer, Function<Integer, Object>, Object> arrayConstructor =
arrayConstructor(elementType, primitive);
return c -> {
int length = -1;
Function<Integer, Object> elementGetter = null;
if (c instanceof List) {
length = ((List<?>) c).size();
elementGetter = i -> elementConverter.apply(((List<?>) c).get(i));
}
if (c != null && c.getClass().isArray()) {
length = Array.getLength(c);
elementGetter = i -> elementConverter.apply(Array.get(c, i));
}
if (elementGetter != null) {
return arrayConstructor.apply(length, elementGetter);
}
return null;
};
}
if (dataType instanceof MapTypeInfo) {
Function<Object, Object> keyConverter =
converter(((MapTypeInfo<?, ?>) dataType).getKeyTypeInfo(), config);
Function<Object, Object> valueConverter =
converter(((MapTypeInfo<?, ?>) dataType).getValueTypeInfo(), config);
return c ->
c instanceof Map
? ((Map<?, ?>) c)
.entrySet().stream()
.collect(
Collectors.toMap(
e -> keyConverter.apply(e.getKey()),
e ->
valueConverter.apply(
e.getValue())))
: null;
}
if (dataType instanceof RowTypeInfo) {
TypeInformation<?>[] fieldTypes = ((RowTypeInfo) dataType).getFieldTypes();
List<Function<Object, Object>> fieldConverters =
Arrays.stream(fieldTypes)
.map(x -> converter(x, config))
.collect(Collectors.toList());
return c -> {
if (c != null && c.getClass().isArray()) {
int length = Array.getLength(c);
if (length - 1 != fieldTypes.length) {
throw new IllegalStateException(
"Input row doesn't have expected number of values required by the schema. "
+ fieldTypes.length
+ " fields are required while "
+ (length - 1)
+ " values are provided.");
}
Row row = new Row(length - 1);
row.setKind(RowKind.fromByteValue(((Number) Array.get(c, 0)).byteValue()));
for (int i = 0; i < row.getArity(); i++) {
row.setField(i, fieldConverters.get(i).apply(Array.get(c, i + 1)));
}
return row;
}
return null;
};
}
if (dataType instanceof TupleTypeInfo) {
TypeInformation<?>[] fieldTypes = ((TupleTypeInfo<?>) dataType).getFieldTypes();
List<Function<Object, Object>> fieldConverters =
Arrays.stream(fieldTypes)
.map(x -> converter(x, config))
.collect(Collectors.toList());
return c -> {
if (c != null && c.getClass().isArray()) {
int length = Array.getLength(c);
if (length != fieldTypes.length) {
throw new IllegalStateException(
"Input tuple doesn't have expected number of values required by the schema. "
+ fieldTypes.length
+ " fields are required while "
+ length
+ " values are provided.");
}
Tuple tuple = Tuple.newInstance(length);
for (int i = 0; i < tuple.getArity(); i++) {
tuple.setField(fieldConverters.get(i).apply(Array.get(c, i)), i);
}
return tuple;
}
return null;
};
}
return c -> {
if (c == null
|| c.getClass() != byte[].class
|| dataType instanceof PickledByteArrayTypeInfo) {
return c;
}
// other typeinfos will use the corresponding serializer to deserialize data.
byte[] b = (byte[]) c;
TypeSerializer<?> dataSerializer =
dataType.createSerializer(config.getSerializerConfig());
ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos();
DataInputViewStreamWrapper baisWrapper = new DataInputViewStreamWrapper(bais);
bais.setBuffer(b, 0, b.length);
try {
return dataSerializer.deserialize(baisWrapper);
} catch (IOException e) {
throw new IllegalStateException(
"Failed to deserialize the object with datatype " + dataType, e);
}
};
}