public static DataFormatConverter getConverterForDataType()

in flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java [167:355]


    public static DataFormatConverter getConverterForDataType(DataType originDataType) {
        DataType dataType = originDataType.nullable();
        DataFormatConverter converter = TYPE_TO_CONVERTER.get(dataType);
        if (converter != null) {
            return converter;
        }

        Class<?> clazz = dataType.getConversionClass();
        LogicalType logicalType = dataType.getLogicalType();
        switch (logicalType.getTypeRoot()) {
            case CHAR:
            case VARCHAR:
                if (clazz == String.class) {
                    return StringConverter.INSTANCE;
                } else if (clazz == StringData.class) {
                    return StringDataConverter.INSTANCE;
                } else {
                    throw new RuntimeException("Not support class for VARCHAR: " + clazz);
                }
            case BINARY:
            case VARBINARY:
                return PrimitiveByteArrayConverter.INSTANCE;
            case DECIMAL:
                Tuple2<Integer, Integer> ps = getPrecision(logicalType);
                if (clazz == BigDecimal.class) {
                    return new BigDecimalConverter(ps.f0, ps.f1);
                } else if (clazz == DecimalData.class) {
                    return new DecimalDataConverter(ps.f0, ps.f1);
                } else {
                    throw new RuntimeException(
                            "Not support conversion class for DECIMAL: " + clazz);
                }
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                int precisionOfTS = getDateTimePrecision(logicalType);
                if (clazz == Timestamp.class) {
                    return new TimestampConverter(precisionOfTS);
                } else if (clazz == LocalDateTime.class) {
                    return new LocalDateTimeConverter(precisionOfTS);
                } else if (clazz == TimestampData.class) {
                    return new TimestampDataConverter(precisionOfTS);
                } else {
                    throw new RuntimeException(
                            "Not support conversion class for TIMESTAMP WITHOUT TIME ZONE: "
                                    + clazz);
                }
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                int precisionOfLZTS = getDateTimePrecision(logicalType);
                if (clazz == Instant.class) {
                    return new InstantConverter(precisionOfLZTS);
                } else if (clazz == Long.class || clazz == long.class) {
                    return new LongTimestampDataConverter(precisionOfLZTS);
                } else if (clazz == TimestampData.class) {
                    return new TimestampDataConverter(precisionOfLZTS);
                } else if (clazz == Timestamp.class) {
                    return new TimestampLtzConverter(precisionOfLZTS);
                } else {
                    throw new RuntimeException(
                            "Not support conversion class for TIMESTAMP WITH LOCAL TIME ZONE: "
                                    + clazz);
                }
            case ARRAY:
                if (clazz == ArrayData.class) {
                    return ArrayDataConverter.INSTANCE;
                } else if (clazz == boolean[].class) {
                    return PrimitiveBooleanArrayConverter.INSTANCE;
                } else if (clazz == short[].class) {
                    return PrimitiveShortArrayConverter.INSTANCE;
                } else if (clazz == int[].class) {
                    return PrimitiveIntArrayConverter.INSTANCE;
                } else if (clazz == long[].class) {
                    return PrimitiveLongArrayConverter.INSTANCE;
                } else if (clazz == float[].class) {
                    return PrimitiveFloatArrayConverter.INSTANCE;
                } else if (clazz == double[].class) {
                    return PrimitiveDoubleArrayConverter.INSTANCE;
                }
                if (dataType instanceof CollectionDataType) {
                    return new ObjectArrayConverter(
                            ((CollectionDataType) dataType)
                                    .getElementDataType()
                                    .bridgedTo(clazz.getComponentType()));
                } else {
                    BasicArrayTypeInfo typeInfo =
                            (BasicArrayTypeInfo)
                                    ((LegacyTypeInformationType) dataType.getLogicalType())
                                            .getTypeInformation();
                    return new ObjectArrayConverter(
                            fromLegacyInfoToDataType(typeInfo.getComponentInfo())
                                    .bridgedTo(clazz.getComponentType()));
                }
            case MAP:
                if (clazz == MapData.class) {
                    return MapDataConverter.INSTANCE;
                }
                KeyValueDataType keyValueDataType = (KeyValueDataType) dataType;
                return new MapConverter(
                        keyValueDataType.getKeyDataType(), keyValueDataType.getValueDataType());
            case MULTISET:
                if (clazz == MapData.class) {
                    return MapDataConverter.INSTANCE;
                }
                CollectionDataType collectionDataType = (CollectionDataType) dataType;
                return new MapConverter(
                        collectionDataType.getElementDataType(),
                        DataTypes.INT().bridgedTo(Integer.class));
            case ROW:
            case STRUCTURED_TYPE:
                TypeInformation<?> asTypeInfo = fromDataTypeToTypeInfo(dataType);
                if (asTypeInfo instanceof InternalTypeInfo && clazz == RowData.class) {
                    LogicalType realLogicalType =
                            ((InternalTypeInfo<?>) asTypeInfo).toLogicalType();
                    return new RowDataConverter(getFieldCount(realLogicalType));
                }

                // legacy

                CompositeType compositeType = (CompositeType) asTypeInfo;
                DataType[] fieldTypes =
                        Stream.iterate(0, x -> x + 1)
                                .limit(compositeType.getArity())
                                .map((Function<Integer, TypeInformation>) compositeType::getTypeAt)
                                .map(TypeConversions::fromLegacyInfoToDataType)
                                .toArray(DataType[]::new);
                if (clazz == RowData.class) {
                    return new RowDataConverter(compositeType.getArity());
                } else if (clazz == Row.class) {
                    return new RowConverter(fieldTypes);
                } else if (Tuple.class.isAssignableFrom(clazz)) {
                    return new TupleConverter((Class<Tuple>) clazz, fieldTypes);
                } else if (CaseClassConverter.PRODUCT_CLASS != null
                        && CaseClassConverter.PRODUCT_CLASS.isAssignableFrom(clazz)) {
                    return new CaseClassConverter((TupleTypeInfoBase) compositeType, fieldTypes);
                } else if (compositeType instanceof PojoTypeInfo) {
                    return new PojoConverter((PojoTypeInfo) compositeType, fieldTypes);
                } else {
                    throw new IllegalStateException(
                            "Cannot find a converter for type "
                                    + compositeType
                                    + ". If the target should be a converter to scala.Product, then you might have a scala classpath issue.");
                }
            case RAW:
                if (logicalType instanceof RawType) {
                    final RawType<?> rawType = (RawType<?>) logicalType;
                    if (clazz == RawValueData.class) {
                        return RawValueDataConverter.INSTANCE;
                    } else {
                        return new GenericConverter<>(rawType.getTypeSerializer());
                    }
                }

                // legacy

                TypeInformation typeInfo =
                        logicalType instanceof LegacyTypeInformationType
                                ? ((LegacyTypeInformationType) logicalType).getTypeInformation()
                                : ((TypeInformationRawType) logicalType).getTypeInformation();

                // planner type info
                if (typeInfo instanceof StringDataTypeInfo) {
                    return StringDataConverter.INSTANCE;
                } else if (typeInfo instanceof DecimalDataTypeInfo) {
                    DecimalDataTypeInfo decimalType = (DecimalDataTypeInfo) typeInfo;
                    return new DecimalDataConverter(decimalType.precision(), decimalType.scale());
                } else if (typeInfo instanceof BigDecimalTypeInfo) {
                    BigDecimalTypeInfo decimalType = (BigDecimalTypeInfo) typeInfo;
                    return new BigDecimalConverter(decimalType.precision(), decimalType.scale());
                } else if (typeInfo instanceof TimestampDataTypeInfo) {
                    TimestampDataTypeInfo timestampDataTypeInfo = (TimestampDataTypeInfo) typeInfo;
                    return new TimestampDataConverter(timestampDataTypeInfo.getPrecision());
                } else if (typeInfo instanceof LegacyLocalDateTimeTypeInfo) {
                    LegacyLocalDateTimeTypeInfo dateTimeType =
                            (LegacyLocalDateTimeTypeInfo) typeInfo;
                    return new LocalDateTimeConverter(dateTimeType.getPrecision());
                } else if (typeInfo instanceof LegacyTimestampTypeInfo) {
                    LegacyTimestampTypeInfo timestampType = (LegacyTimestampTypeInfo) typeInfo;
                    return new TimestampConverter(timestampType.getPrecision());
                } else if (typeInfo instanceof LegacyInstantTypeInfo) {
                    LegacyInstantTypeInfo instantTypeInfo = (LegacyInstantTypeInfo) typeInfo;
                    return new InstantConverter(instantTypeInfo.getPrecision());
                }

                if (clazz == RawValueData.class) {
                    return RawValueDataConverter.INSTANCE;
                }
                return new GenericConverter(typeInfo.createSerializer(new SerializerConfigImpl()));
            default:
                throw new RuntimeException("Not support dataType: " + dataType);
        }
    }