public static DataType convertToDataType()

in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/DataTypeConverterUtils.java [31:136]


    public static DataType convertToDataType(FlinkDataType flinkDataType) {
        DataType dataType;

        String type = flinkDataType.getType().toLowerCase();
        boolean isNullable = flinkDataType.isNullable();
        Integer precision = flinkDataType.getPrecision();
        Integer scale = flinkDataType.getScale();

        switch (type) {
            // Integer types
            case "tinyint":
            case "int1":
                dataType = DataTypes.TINYINT();
                break;
            case "smallint":
            case "int2":
                dataType = DataTypes.SMALLINT();
                break;
            case "int":
            case "integer":
            case "int4":
                dataType = DataTypes.INT();
                break;
            case "bigint":
            case "int8":
                dataType = DataTypes.BIGINT();
                break;

            // Floating-point types
            case "float":
            case "real":
                dataType = DataTypes.FLOAT();
                break;
            case "double":
            case "float8":
                dataType = DataTypes.DOUBLE();
                break;

            // Decimal and Numeric types
            case "decimal":
            case "numeric":
                if (precision != null && scale != null) {
                    dataType = DataTypes.DECIMAL(precision, scale);
                } else {
                    dataType = DataTypes.DECIMAL(38, 18); // Default precision and scale
                }
                break;

            // Character types
            case "char":
                if (precision != null) {
                    dataType = DataTypes.CHAR(precision);
                } else {
                    dataType = DataTypes.CHAR(1); // Default size
                }
                break;
            case "varchar":
            case "string":
            case "text":
                dataType = DataTypes.STRING();
                break;

            // Binary data types
            case "binary":
            case "varbinary":
            case "blob":
                dataType = DataTypes.BYTES();
                break;

            // Date and time types
            case "date":
                dataType = DataTypes.DATE();
                break;
            case "timestamp":
                if (precision != null) {
                    dataType = DataTypes.TIMESTAMP(precision);
                } else {
                    dataType = DataTypes.TIMESTAMP(3); // Default precision
                }
                break;
            case "time":
                dataType = DataTypes.TIME();
                break;

            // Boolean type
            case "boolean":
            case "bool":
                dataType = DataTypes.BOOLEAN();
                break;

            // JSON and other types
            case "json":
                dataType = DataTypes.STRING(); // JSON as STRING in Flink
                break;
            case "uuid":
                dataType = DataTypes.STRING(); // UUID as STRING
                break;

            // Default case for unsupported types
            default:
                throw new IllegalArgumentException("Unsupported type: " + type);
        }

        // Apply nullability
        return isNullable ? dataType.nullable() : dataType.notNull();
    }