public static FlinkDataType convertorToFlinkDataType()

in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/DataTypeConverterUtils.java [138:195]


    public static FlinkDataType convertorToFlinkDataType(DataType dataType) {
        LogicalType logicalType = dataType.getLogicalType();
        boolean isNullable = dataType.getLogicalType().isNullable();
        String typeName = logicalType.getTypeRoot().name().toLowerCase();
        Integer precision = null;
        Integer scale = null;

        switch (logicalType.getTypeRoot()) {
            case CHAR:
            case VARCHAR:
                if (logicalType instanceof CharType) {
                    precision = ((CharType) logicalType).getLength();
                } else if (logicalType instanceof VarCharType) {
                    precision = ((VarCharType) logicalType).getLength();
                }
                break;
            case DECIMAL:
                if (logicalType instanceof DecimalType) {
                    precision = ((DecimalType) logicalType).getPrecision();
                    scale = ((DecimalType) logicalType).getScale();
                }
                break;
            case TINYINT:
            case SMALLINT:
            case INTEGER:
            case BIGINT:
            case FLOAT:
            case DOUBLE:
            case DATE:
            case TIME_WITHOUT_TIME_ZONE:
            case TIMESTAMP_WITHOUT_TIME_ZONE:
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
            case BOOLEAN:
            case BINARY:
            case VARBINARY:
            case ARRAY:
            case MULTISET:
            case MAP:
            case ROW:
            case RAW:
            case NULL:
            case SYMBOL:
            case UNRESOLVED:
            case INTERVAL_YEAR_MONTH:
            case INTERVAL_DAY_TIME:
            case TIMESTAMP_WITH_TIME_ZONE:
            case DISTINCT_TYPE:
            case STRUCTURED_TYPE:
                // case JSON:
                // case UUID:
                // These types do not have precision or scale
                break;
            default:
                throw new IllegalArgumentException("Unsupported type: " + logicalType);
        }

        return new FlinkDataType(typeName, isNullable, precision, scale);
    }