private static DataType convertFromColumn()

in flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java [126:249]


    private static DataType convertFromColumn(Column column, boolean tinyInt1isBit) {
        String typeName = column.typeName();
        switch (typeName) {
            case BIT:
                return column.length() == 1
                        ? DataTypes.BOOLEAN()
                        : DataTypes.BINARY((column.length() + 7) / 8);
            case BOOL:
            case BOOLEAN:
                return DataTypes.BOOLEAN();
            case TINYINT:
                // MySQL haven't boolean type, it uses tinyint(1) to represents boolean type
                // user should not use tinyint(1) to store number although jdbc url parameter
                // tinyInt1isBit=false can help change the return value, it's not a general way
                // btw: mybatis and mysql-connector-java map tinyint(1) to boolean by default
                return (column.length() == 1 && tinyInt1isBit)
                        ? DataTypes.BOOLEAN()
                        : DataTypes.TINYINT();
            case TINYINT_UNSIGNED:
            case TINYINT_UNSIGNED_ZEROFILL:
            case SMALLINT:
                return DataTypes.SMALLINT();
            case SMALLINT_UNSIGNED:
            case SMALLINT_UNSIGNED_ZEROFILL:
            case INT:
            case INTEGER:
            case MEDIUMINT:
            case MEDIUMINT_UNSIGNED:
            case MEDIUMINT_UNSIGNED_ZEROFILL:
            case YEAR:
                return DataTypes.INT();
            case INT_UNSIGNED:
            case INT_UNSIGNED_ZEROFILL:
            case INTEGER_UNSIGNED:
            case INTEGER_UNSIGNED_ZEROFILL:
            case BIGINT:
                return DataTypes.BIGINT();
            case BIGINT_UNSIGNED:
            case BIGINT_UNSIGNED_ZEROFILL:
            case SERIAL:
                return DataTypes.DECIMAL(20, 0);
            case FLOAT:
            case FLOAT_UNSIGNED:
            case FLOAT_UNSIGNED_ZEROFILL:
                if (column.length() != FLOAT_LENGTH_UNSPECIFIED_FLAG) {
                    // For FLOAT types with length provided explicitly, treat it like DOUBLE
                    return DataTypes.DOUBLE();
                } else {
                    return DataTypes.FLOAT();
                }
            case REAL:
            case REAL_UNSIGNED:
            case REAL_UNSIGNED_ZEROFILL:
            case DOUBLE:
            case DOUBLE_UNSIGNED:
            case DOUBLE_UNSIGNED_ZEROFILL:
            case DOUBLE_PRECISION:
            case DOUBLE_PRECISION_UNSIGNED:
            case DOUBLE_PRECISION_UNSIGNED_ZEROFILL:
                return DataTypes.DOUBLE();
            case NUMERIC:
            case NUMERIC_UNSIGNED:
            case NUMERIC_UNSIGNED_ZEROFILL:
            case FIXED:
            case FIXED_UNSIGNED:
            case FIXED_UNSIGNED_ZEROFILL:
            case DECIMAL:
            case DECIMAL_UNSIGNED:
            case DECIMAL_UNSIGNED_ZEROFILL:
                return column.length() <= 38
                        ? DataTypes.DECIMAL(column.length(), column.scale().orElse(0))
                        : DataTypes.STRING();
            case TIME:
                return column.length() >= 0 ? DataTypes.TIME(column.length()) : DataTypes.TIME();
            case DATE:
                return DataTypes.DATE();
            case DATETIME:
                return column.length() >= 0
                        ? DataTypes.TIMESTAMP(column.length())
                        : DataTypes.TIMESTAMP(0);
            case TIMESTAMP:
                return column.length() >= 0
                        ? DataTypes.TIMESTAMP_LTZ(column.length())
                        : DataTypes.TIMESTAMP_LTZ(0);
            case CHAR:
                return column.length() > 0
                        ? DataTypes.CHAR(column.length())
                        : column.length() == 0 ? CharType.ofEmptyLiteral() : DataTypes.CHAR(1);
            case VARCHAR:
                return DataTypes.VARCHAR(column.length());
            case TINYTEXT:
            case TEXT:
            case MEDIUMTEXT:
            case LONGTEXT:
            case JSON:
            case ENUM:
            case GEOMETRY:
            case POINT:
            case LINESTRING:
            case POLYGON:
            case GEOMETRYCOLLECTION:
            case GEOMCOLLECTION:
            case MULTIPOINT:
            case MULTIPOLYGON:
            case MULTILINESTRING:
                return DataTypes.STRING();
            case BINARY:
                return column.length() > 0
                        ? DataTypes.BINARY(column.length())
                        : column.length() == 0 ? BinaryType.ofEmptyLiteral() : DataTypes.BINARY(1);
            case VARBINARY:
                return DataTypes.VARBINARY(column.length());
            case TINYBLOB:
            case BLOB:
            case MEDIUMBLOB:
            case LONGBLOB:
                return DataTypes.BYTES();
            case SET:
                return DataTypes.ARRAY(DataTypes.STRING());
            default:
                throw new UnsupportedOperationException(
                        String.format("MySQL type '%s' is not supported yet.", typeName));
        }
    }