private static Function converter()

in flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java [1679:1986]


    private static Function<Object, Object> converter(
            final TypeInformation<?> dataType, final ExecutionConfig config) {
        if (dataType.equals(Types.BOOLEAN)) {
            return b -> b instanceof Boolean ? b : null;
        }
        if (dataType.equals(Types.BYTE)) {
            return c -> {
                if (c instanceof Byte) {
                    return c;
                }
                if (c instanceof Short) {
                    return ((Short) c).byteValue();
                }
                if (c instanceof Integer) {
                    return ((Integer) c).byteValue();
                }
                if (c instanceof Long) {
                    return ((Long) c).byteValue();
                }
                return null;
            };
        }
        if (dataType.equals(Types.SHORT)) {
            return c -> {
                if (c instanceof Byte) {
                    return ((Byte) c).shortValue();
                }
                if (c instanceof Short) {
                    return c;
                }
                if (c instanceof Integer) {
                    return ((Integer) c).shortValue();
                }
                if (c instanceof Long) {
                    return ((Long) c).shortValue();
                }
                return null;
            };
        }
        if (dataType.equals(Types.INT)) {
            return c -> {
                if (c instanceof Byte) {
                    return ((Byte) c).intValue();
                }
                if (c instanceof Short) {
                    return ((Short) c).intValue();
                }
                if (c instanceof Integer) {
                    return c;
                }
                if (c instanceof Long) {
                    return ((Long) c).intValue();
                }
                return null;
            };
        }
        if (dataType.equals(Types.LONG)) {
            return c -> {
                if (c instanceof Byte) {
                    return ((Byte) c).longValue();
                }
                if (c instanceof Short) {
                    return ((Short) c).longValue();
                }
                if (c instanceof Integer) {
                    return ((Integer) c).longValue();
                }
                if (c instanceof Long) {
                    return c;
                }
                return null;
            };
        }
        if (dataType.equals(Types.FLOAT)) {
            return c -> {
                if (c instanceof Float) {
                    return c;
                }
                if (c instanceof Double) {
                    return ((Double) c).floatValue();
                }
                return null;
            };
        }
        if (dataType.equals(Types.DOUBLE)) {
            return c -> {
                if (c instanceof Float) {
                    return ((Float) c).doubleValue();
                }
                if (c instanceof Double) {
                    return c;
                }
                return null;
            };
        }
        if (dataType.equals(Types.BIG_DEC)) {
            return c -> c instanceof BigDecimal ? c : null;
        }
        if (dataType.equals(Types.SQL_DATE)) {
            return c -> {
                if (c instanceof Integer) {
                    long millisLocal = ((Integer) c).longValue() * 86400000;
                    long millisUtc = millisLocal - getOffsetFromLocalMillis(millisLocal);
                    return new Date(millisUtc);
                }
                return null;
            };
        }

        if (dataType.equals(Types.SQL_TIME)) {
            return c ->
                    c instanceof Integer || c instanceof Long
                            ? new Time(((Number) c).longValue() / 1000)
                            : null;
        }

        if (dataType.equals(Types.SQL_TIMESTAMP)) {
            return c ->
                    c instanceof Integer || c instanceof Long
                            ? new Timestamp(((Number) c).longValue() / 1000)
                            : null;
        }

        if (dataType.equals(Types.LOCAL_DATE)) {
            return c -> {
                if (c instanceof Integer) {
                    long millisLocal = ((Integer) c).longValue() * 86400000;
                    long millisUtc = millisLocal - getOffsetFromLocalMillis(millisLocal);
                    return Instant.ofEpochMilli(millisUtc)
                            .atZone(ZoneId.systemDefault())
                            .toLocalDate();
                }
                return null;
            };
        }

        if (dataType.equals(Types.LOCAL_TIME)) {
            return c ->
                    c instanceof Integer || c instanceof Long
                            ? Instant.ofEpochMilli(((Number) c).longValue() / 1000)
                                    .atZone(ZoneId.systemDefault())
                                    .toLocalTime()
                            : null;
        }

        if (dataType.equals(Types.LOCAL_DATE_TIME)) {
            return c ->
                    c instanceof Integer || c instanceof Long
                            ? Instant.ofEpochMilli(((Number) c).longValue() / 1000)
                                    .atZone(ZoneId.systemDefault())
                                    .toLocalDateTime()
                            : null;
        }

        if (dataType.equals(org.apache.flink.api.common.typeinfo.Types.INSTANT)) {
            return c ->
                    c instanceof Integer || c instanceof Long
                            ? Instant.ofEpochMilli(((Number) c).longValue() / 1000)
                            : null;
        }
        if (dataType.equals(TimeIntervalTypeInfo.INTERVAL_MILLIS)) {
            return c ->
                    c instanceof Integer || c instanceof Long
                            ? ((Number) c).longValue() / 1000
                            : null;
        }
        if (dataType.equals(Types.STRING)) {
            return c -> c != null ? c.toString() : null;
        }
        if (dataType.equals(PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)) {
            return c -> {
                if (c instanceof String) {
                    return ((String) c).getBytes(StandardCharsets.UTF_8);
                }
                if (c instanceof byte[]) {
                    return c;
                }
                return null;
            };
        }
        if (dataType instanceof PrimitiveArrayTypeInfo
                || dataType instanceof BasicArrayTypeInfo
                || dataType instanceof ObjectArrayTypeInfo) {
            TypeInformation<?> elementType =
                    dataType instanceof PrimitiveArrayTypeInfo
                            ? ((PrimitiveArrayTypeInfo<?>) dataType).getComponentType()
                            : dataType instanceof BasicArrayTypeInfo
                                    ? ((BasicArrayTypeInfo<?, ?>) dataType).getComponentInfo()
                                    : ((ObjectArrayTypeInfo<?, ?>) dataType).getComponentInfo();
            boolean primitive = dataType instanceof PrimitiveArrayTypeInfo;
            Function<Object, Object> elementConverter = converter(elementType, config);
            BiFunction<Integer, Function<Integer, Object>, Object> arrayConstructor =
                    arrayConstructor(elementType, primitive);
            return c -> {
                int length = -1;
                Function<Integer, Object> elementGetter = null;
                if (c instanceof List) {
                    length = ((List<?>) c).size();
                    elementGetter = i -> elementConverter.apply(((List<?>) c).get(i));
                }
                if (c != null && c.getClass().isArray()) {
                    length = Array.getLength(c);
                    elementGetter = i -> elementConverter.apply(Array.get(c, i));
                }
                if (elementGetter != null) {
                    return arrayConstructor.apply(length, elementGetter);
                }
                return null;
            };
        }
        if (dataType instanceof MapTypeInfo) {
            Function<Object, Object> keyConverter =
                    converter(((MapTypeInfo<?, ?>) dataType).getKeyTypeInfo(), config);
            Function<Object, Object> valueConverter =
                    converter(((MapTypeInfo<?, ?>) dataType).getValueTypeInfo(), config);
            return c ->
                    c instanceof Map
                            ? ((Map<?, ?>) c)
                                    .entrySet().stream()
                                            .collect(
                                                    Collectors.toMap(
                                                            e -> keyConverter.apply(e.getKey()),
                                                            e ->
                                                                    valueConverter.apply(
                                                                            e.getValue())))
                            : null;
        }
        if (dataType instanceof RowTypeInfo) {
            TypeInformation<?>[] fieldTypes = ((RowTypeInfo) dataType).getFieldTypes();
            List<Function<Object, Object>> fieldConverters =
                    Arrays.stream(fieldTypes)
                            .map(x -> converter(x, config))
                            .collect(Collectors.toList());
            return c -> {
                if (c != null && c.getClass().isArray()) {
                    int length = Array.getLength(c);
                    if (length - 1 != fieldTypes.length) {
                        throw new IllegalStateException(
                                "Input row doesn't have expected number of values required by the schema. "
                                        + fieldTypes.length
                                        + " fields are required while "
                                        + (length - 1)
                                        + " values are provided.");
                    }

                    Row row = new Row(length - 1);
                    row.setKind(RowKind.fromByteValue(((Number) Array.get(c, 0)).byteValue()));

                    for (int i = 0; i < row.getArity(); i++) {
                        row.setField(i, fieldConverters.get(i).apply(Array.get(c, i + 1)));
                    }

                    return row;
                }
                return null;
            };
        }
        if (dataType instanceof TupleTypeInfo) {
            TypeInformation<?>[] fieldTypes = ((TupleTypeInfo<?>) dataType).getFieldTypes();
            List<Function<Object, Object>> fieldConverters =
                    Arrays.stream(fieldTypes)
                            .map(x -> converter(x, config))
                            .collect(Collectors.toList());
            return c -> {
                if (c != null && c.getClass().isArray()) {
                    int length = Array.getLength(c);
                    if (length != fieldTypes.length) {
                        throw new IllegalStateException(
                                "Input tuple doesn't have expected number of values required by the schema. "
                                        + fieldTypes.length
                                        + " fields are required while "
                                        + length
                                        + " values are provided.");
                    }

                    Tuple tuple = Tuple.newInstance(length);
                    for (int i = 0; i < tuple.getArity(); i++) {
                        tuple.setField(fieldConverters.get(i).apply(Array.get(c, i)), i);
                    }

                    return tuple;
                }
                return null;
            };
        }

        return c -> {
            if (c == null
                    || c.getClass() != byte[].class
                    || dataType instanceof PickledByteArrayTypeInfo) {
                return c;
            }

            // other typeinfos will use the corresponding serializer to deserialize data.
            byte[] b = (byte[]) c;
            TypeSerializer<?> dataSerializer =
                    dataType.createSerializer(config.getSerializerConfig());
            ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos();
            DataInputViewStreamWrapper baisWrapper = new DataInputViewStreamWrapper(bais);
            bais.setBuffer(b, 0, b.length);
            try {
                return dataSerializer.deserialize(baisWrapper);
            } catch (IOException e) {
                throw new IllegalStateException(
                        "Failed to deserialize the object with datatype " + dataType, e);
            }
        };
    }