private static Function converter()

in flink-python/src/main/java/org/apache/flink/table/utils/python/PythonTableUtils.java [206:488]


    private static Function<Object, Object> converter(LogicalType logicalType) {

        if (logicalType instanceof NullType) {
            return n -> null;
        }

        if (logicalType instanceof BooleanType) {
            return b -> b instanceof Boolean ? b : null;
        }

        if (logicalType instanceof TinyIntType) {
            return c -> {
                if (c instanceof Byte) {
                    return c;
                }
                if (c instanceof Short) {
                    return ((Short) c).byteValue();
                }
                if (c instanceof Integer) {
                    return ((Integer) c).byteValue();
                }
                if (c instanceof Long) {
                    return ((Long) c).byteValue();
                }
                return null;
            };
        }

        if (logicalType instanceof SmallIntType) {
            return c -> {
                if (c instanceof Byte) {
                    return ((Byte) c).shortValue();
                }
                if (c instanceof Short) {
                    return c;
                }
                if (c instanceof Integer) {
                    return ((Integer) c).shortValue();
                }
                if (c instanceof Long) {
                    return ((Long) c).shortValue();
                }
                return null;
            };
        }

        if (logicalType instanceof IntType) {
            return c -> {
                if (c instanceof Byte) {
                    return ((Byte) c).intValue();
                }
                if (c instanceof Short) {
                    return ((Short) c).intValue();
                }
                if (c instanceof Integer) {
                    return c;
                }
                if (c instanceof Long) {
                    return ((Long) c).intValue();
                }
                return null;
            };
        }

        if (logicalType instanceof BigIntType) {
            return c -> {
                if (c instanceof Byte) {
                    return ((Byte) c).longValue();
                }
                if (c instanceof Short) {
                    return ((Short) c).longValue();
                }
                if (c instanceof Integer) {
                    return ((Integer) c).longValue();
                }
                if (c instanceof Long) {
                    return c;
                }
                return null;
            };
        }

        if (logicalType instanceof FloatType) {
            return c -> {
                if (c instanceof Float) {
                    return c;
                }
                if (c instanceof Double) {
                    return ((Double) c).floatValue();
                }
                return null;
            };
        }
        if (logicalType instanceof DoubleType) {
            return c -> {
                if (c instanceof Float) {
                    return ((Float) c).doubleValue();
                }
                if (c instanceof Double) {
                    return c;
                }
                return null;
            };
        }

        if (logicalType instanceof DecimalType) {
            int precision = ((DecimalType) logicalType).getPrecision();
            int scale = ((DecimalType) logicalType).getScale();
            return c ->
                    c instanceof BigDecimal
                            ? DecimalData.fromBigDecimal((BigDecimal) c, precision, scale)
                            : null;
        }

        if (logicalType instanceof DateType) {
            return c -> {
                if (c instanceof Integer) {
                    return (Integer) c;
                }
                return null;
            };
        }

        if (logicalType instanceof TimeType) {
            return c -> {
                if (c instanceof Integer || c instanceof Long) {
                    long millisLocal = ((Number) c).longValue() / 1000;
                    long millisUtc = millisLocal + getOffsetFromLocalMillis(millisLocal);
                    return (int) millisUtc;
                }
                return null;
            };
        }

        if (logicalType instanceof TimestampType) {
            return c ->
                    c instanceof Integer || c instanceof Long
                            ? TimestampData.fromLocalDateTime(
                                    Instant.ofEpochMilli(((Number) c).longValue() / 1000)
                                            .atZone(ZoneId.systemDefault())
                                            .toLocalDateTime())
                            : null;
        }

        if (logicalType instanceof ZonedTimestampType) {
            return c ->
                    c instanceof Integer || c instanceof Long
                            ? TimestampData.fromInstant(
                                    Instant.ofEpochMilli(((Number) c).longValue() / 1000))
                            : null;
        }

        if (logicalType instanceof LocalZonedTimestampType) {
            return c ->
                    c instanceof Integer || c instanceof Long
                            ? TimestampData.fromInstant(
                                    Instant.ofEpochMilli(((Number) c).longValue() / 1000))
                            : null;
        }

        if (logicalType instanceof DayTimeIntervalType) {
            return c ->
                    c instanceof Integer || c instanceof Long
                            ? ((Number) c).longValue() / 1000
                            : null;
        }

        if (logicalType instanceof YearMonthIntervalType) {
            return c ->
                    c instanceof Integer || c instanceof Long
                            ? ((Number) c).longValue() / 1000
                            : null;
        }

        if (logicalType instanceof CharType || logicalType instanceof VarCharType) {
            return c -> c != null ? StringData.fromString(c.toString()) : null;
        }

        if (logicalType instanceof BinaryType || logicalType instanceof VarBinaryType) {
            return c -> {
                if (c instanceof String) {
                    return ((String) c).getBytes(StandardCharsets.UTF_8);
                }
                if (c instanceof byte[]) {
                    return c;
                }
                return null;
            };
        }

        if (logicalType instanceof ArrayType) {
            LogicalType elementType = ((ArrayType) logicalType).getElementType();
            Function<Object, Object> elementConverter = converter(elementType);
            BiFunction<Integer, Function<Integer, Object>, Object> arrayConstructor =
                    arrayConstructor(elementType);
            return c -> {
                int length = -1;
                Function<Integer, Object> elementGetter = null;
                if (c instanceof List) {
                    length = ((List<?>) c).size();
                    elementGetter = i -> elementConverter.apply(((List<?>) c).get(i));
                }
                if (c != null && c.getClass().isArray()) {
                    length = Array.getLength(c);
                    elementGetter = i -> elementConverter.apply(Array.get(c, i));
                }
                if (elementGetter != null) {
                    return arrayConstructor.apply(length, elementGetter);
                }
                return null;
            };
        }

        if (logicalType instanceof MultisetType) {
            return c -> c;
        }

        if (logicalType instanceof MapType) {
            Function<Object, Object> keyConverter = converter(((MapType) logicalType).getKeyType());
            Function<Object, Object> valueConverter =
                    converter(((MapType) logicalType).getValueType());

            return c -> {
                if (c instanceof Map) {
                    Map<?, ?> mapData =
                            ((Map<?, ?>) c)
                                    .entrySet().stream()
                                            .collect(
                                                    Collectors.toMap(
                                                            e -> keyConverter.apply(e.getKey()),
                                                            e ->
                                                                    valueConverter.apply(
                                                                            e.getValue())));
                    return new GenericMapData(mapData);
                } else {
                    return null;
                }
            };
        }

        if (logicalType instanceof RowType) {
            LogicalType[] fieldTypes = logicalType.getChildren().toArray(new LogicalType[0]);
            List<Function<Object, Object>> fieldConverters =
                    Arrays.stream(fieldTypes)
                            .map(PythonTableUtils::converter)
                            .collect(Collectors.toList());
            return c -> {
                if (c != null && c.getClass().isArray()) {
                    int length = Array.getLength(c);
                    if (length - 1 != fieldTypes.length) {
                        throw new IllegalStateException(
                                "Input row doesn't have expected number of values required by the schema. "
                                        + fieldTypes.length
                                        + " fields are required while "
                                        + (length - 1)
                                        + " values are provided.");
                    }

                    GenericRowData row = new GenericRowData(length - 1);
                    row.setRowKind(RowKind.fromByteValue(((Number) Array.get(c, 0)).byteValue()));

                    for (int i = 0; i < row.getArity(); i++) {
                        row.setField(i, fieldConverters.get(i).apply(Array.get(c, i + 1)));
                    }

                    return row;
                }
                return null;
            };
        } else if (logicalType instanceof StructuredType) {
            Optional<Class<?>> implClass = ((StructuredType) logicalType).getImplementationClass();
            if (implClass.isPresent()
                    && (implClass.get() == ListView.class || implClass.get() == MapView.class)) {
                return converter(logicalType.getChildren().get(0));
            }
            throw new IllegalStateException(
                    "Failed to get the data converter for StructuredType with implementation "
                            + "class: "
                            + implClass.orElse(null));
        }

        throw new IllegalStateException("Failed to get converter for LogicalType: " + logicalType);
    }