private void resolveColumnDataType()

in seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomColumnDefinitionParserListener.java [141:302]


    private void resolveColumnDataType(MySqlParser.DataTypeContext dataTypeContext) {
        String charsetName = null;
        DataType dataType = dataTypeResolver.resolveDataType(dataTypeContext);

        if (dataTypeContext instanceof MySqlParser.StringDataTypeContext) {
            // Same as LongVarcharDataTypeContext but with dimension handling
            MySqlParser.StringDataTypeContext stringDataTypeContext =
                    (MySqlParser.StringDataTypeContext) dataTypeContext;

            if (stringDataTypeContext.lengthOneDimension() != null) {
                Integer length =
                        parseLength(
                                stringDataTypeContext
                                        .lengthOneDimension()
                                        .decimalLiteral()
                                        .getText());
                columnEditor.length(length);
            }

            charsetName =
                    parser.extractCharset(
                            stringDataTypeContext.charsetName(),
                            stringDataTypeContext.collationName());
        } else if (dataTypeContext instanceof MySqlParser.LongVarcharDataTypeContext) {
            // Same as StringDataTypeContext but without dimension handling
            MySqlParser.LongVarcharDataTypeContext longVarcharTypeContext =
                    (MySqlParser.LongVarcharDataTypeContext) dataTypeContext;

            charsetName =
                    parser.extractCharset(
                            longVarcharTypeContext.charsetName(),
                            longVarcharTypeContext.collationName());
        } else if (dataTypeContext instanceof MySqlParser.NationalStringDataTypeContext) {
            MySqlParser.NationalStringDataTypeContext nationalStringDataTypeContext =
                    (MySqlParser.NationalStringDataTypeContext) dataTypeContext;

            if (nationalStringDataTypeContext.lengthOneDimension() != null) {
                Integer length =
                        parseLength(
                                nationalStringDataTypeContext
                                        .lengthOneDimension()
                                        .decimalLiteral()
                                        .getText());
                columnEditor.length(length);
            }
        } else if (dataTypeContext instanceof MySqlParser.NationalVaryingStringDataTypeContext) {
            MySqlParser.NationalVaryingStringDataTypeContext nationalVaryingStringDataTypeContext =
                    (MySqlParser.NationalVaryingStringDataTypeContext) dataTypeContext;

            if (nationalVaryingStringDataTypeContext.lengthOneDimension() != null) {
                Integer length =
                        parseLength(
                                nationalVaryingStringDataTypeContext
                                        .lengthOneDimension()
                                        .decimalLiteral()
                                        .getText());
                columnEditor.length(length);
            }
        } else if (dataTypeContext instanceof MySqlParser.DimensionDataTypeContext) {
            MySqlParser.DimensionDataTypeContext dimensionDataTypeContext =
                    (MySqlParser.DimensionDataTypeContext) dataTypeContext;

            Integer length = null;
            Integer scale = null;
            if (dimensionDataTypeContext.lengthOneDimension() != null) {
                length =
                        parseLength(
                                dimensionDataTypeContext
                                        .lengthOneDimension()
                                        .decimalLiteral()
                                        .getText());
            }

            if (dimensionDataTypeContext.lengthTwoDimension() != null) {
                List<MySqlParser.DecimalLiteralContext> decimalLiterals =
                        dimensionDataTypeContext.lengthTwoDimension().decimalLiteral();
                length = parseLength(decimalLiterals.get(0).getText());
                scale = Integer.valueOf(decimalLiterals.get(1).getText());
            }

            if (dimensionDataTypeContext.lengthTwoOptionalDimension() != null) {
                List<MySqlParser.DecimalLiteralContext> decimalLiterals =
                        dimensionDataTypeContext.lengthTwoOptionalDimension().decimalLiteral();
                if (decimalLiterals.get(0).REAL_LITERAL() != null) {
                    String[] digits = DOT.split(decimalLiterals.get(0).getText());
                    if (Strings.isNullOrEmpty(digits[0]) || Integer.valueOf(digits[0]) == 0) {
                        // Set default value 10 according mysql engine
                        length = 10;
                    } else {
                        length = parseLength(digits[0]);
                    }
                } else {
                    length = parseLength(decimalLiterals.get(0).getText());
                }

                if (decimalLiterals.size() > 1) {
                    scale = Integer.valueOf(decimalLiterals.get(1).getText());
                }
            }
            if (length != null) {
                columnEditor.length(length);
            }
            if (scale != null) {
                columnEditor.scale(scale);
            }
        } else if (dataTypeContext instanceof MySqlParser.CollectionDataTypeContext) {
            MySqlParser.CollectionDataTypeContext collectionDataTypeContext =
                    (MySqlParser.CollectionDataTypeContext) dataTypeContext;
            if (collectionDataTypeContext.charsetName() != null) {
                charsetName = collectionDataTypeContext.charsetName().getText();
            }

            if (dataType.name().equalsIgnoreCase("SET")) {
                // After DBZ-132, it will always be comma separated
                int optionsSize =
                        collectionDataTypeContext.collectionOptions().collectionOption().size();
                columnEditor.length(
                        Math.max(0, optionsSize * 2 - 1)); // number of options + number of commas
            } else {
                columnEditor.length(1);
            }
        }

        String dataTypeName = dataType.name().toUpperCase();

        if (dataTypeName.equals("ENUM") || dataTypeName.equals("SET")) {
            // type expression has to be set, because the value converter needs to know the enum or
            // set options
            MySqlParser.CollectionDataTypeContext collectionDataTypeContext =
                    (MySqlParser.CollectionDataTypeContext) dataTypeContext;

            List<String> collectionOptions =
                    collectionDataTypeContext.collectionOptions().collectionOption().stream()
                            .map(AntlrDdlParser::getText)
                            .collect(Collectors.toList());

            columnEditor.type(dataTypeName);
            columnEditor.enumValues(collectionOptions);
        } else if (dataTypeName.equals("SERIAL")) {
            // SERIAL is an alias for BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE
            columnEditor.type("BIGINT UNSIGNED");
            serialColumn();
        } else {
            columnEditor.type(dataTypeName);
        }

        int jdbcDataType = dataType.jdbcType();
        columnEditor.jdbcType(jdbcDataType);

        if (columnEditor.length() == -1) {
            columnEditor.length((int) dataType.length());
        }
        if (!columnEditor.scale().isPresent() && dataType.scale() != Column.UNSET_INT_VALUE) {
            columnEditor.scale(dataType.scale());
        }
        if (Types.NCHAR == jdbcDataType || Types.NVARCHAR == jdbcDataType) {
            // NCHAR and NVARCHAR columns always uses utf8 as charset
            columnEditor.charsetName("utf8");
        } else {
            columnEditor.charsetName(charsetName);
        }
    }