in seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/parser/CustomColumnDefinitionParserListener.java [141:302]
private void resolveColumnDataType(MySqlParser.DataTypeContext dataTypeContext) {
String charsetName = null;
DataType dataType = dataTypeResolver.resolveDataType(dataTypeContext);
if (dataTypeContext instanceof MySqlParser.StringDataTypeContext) {
// Same as LongVarcharDataTypeContext but with dimension handling
MySqlParser.StringDataTypeContext stringDataTypeContext =
(MySqlParser.StringDataTypeContext) dataTypeContext;
if (stringDataTypeContext.lengthOneDimension() != null) {
Integer length =
parseLength(
stringDataTypeContext
.lengthOneDimension()
.decimalLiteral()
.getText());
columnEditor.length(length);
}
charsetName =
parser.extractCharset(
stringDataTypeContext.charsetName(),
stringDataTypeContext.collationName());
} else if (dataTypeContext instanceof MySqlParser.LongVarcharDataTypeContext) {
// Same as StringDataTypeContext but without dimension handling
MySqlParser.LongVarcharDataTypeContext longVarcharTypeContext =
(MySqlParser.LongVarcharDataTypeContext) dataTypeContext;
charsetName =
parser.extractCharset(
longVarcharTypeContext.charsetName(),
longVarcharTypeContext.collationName());
} else if (dataTypeContext instanceof MySqlParser.NationalStringDataTypeContext) {
MySqlParser.NationalStringDataTypeContext nationalStringDataTypeContext =
(MySqlParser.NationalStringDataTypeContext) dataTypeContext;
if (nationalStringDataTypeContext.lengthOneDimension() != null) {
Integer length =
parseLength(
nationalStringDataTypeContext
.lengthOneDimension()
.decimalLiteral()
.getText());
columnEditor.length(length);
}
} else if (dataTypeContext instanceof MySqlParser.NationalVaryingStringDataTypeContext) {
MySqlParser.NationalVaryingStringDataTypeContext nationalVaryingStringDataTypeContext =
(MySqlParser.NationalVaryingStringDataTypeContext) dataTypeContext;
if (nationalVaryingStringDataTypeContext.lengthOneDimension() != null) {
Integer length =
parseLength(
nationalVaryingStringDataTypeContext
.lengthOneDimension()
.decimalLiteral()
.getText());
columnEditor.length(length);
}
} else if (dataTypeContext instanceof MySqlParser.DimensionDataTypeContext) {
MySqlParser.DimensionDataTypeContext dimensionDataTypeContext =
(MySqlParser.DimensionDataTypeContext) dataTypeContext;
Integer length = null;
Integer scale = null;
if (dimensionDataTypeContext.lengthOneDimension() != null) {
length =
parseLength(
dimensionDataTypeContext
.lengthOneDimension()
.decimalLiteral()
.getText());
}
if (dimensionDataTypeContext.lengthTwoDimension() != null) {
List<MySqlParser.DecimalLiteralContext> decimalLiterals =
dimensionDataTypeContext.lengthTwoDimension().decimalLiteral();
length = parseLength(decimalLiterals.get(0).getText());
scale = Integer.valueOf(decimalLiterals.get(1).getText());
}
if (dimensionDataTypeContext.lengthTwoOptionalDimension() != null) {
List<MySqlParser.DecimalLiteralContext> decimalLiterals =
dimensionDataTypeContext.lengthTwoOptionalDimension().decimalLiteral();
if (decimalLiterals.get(0).REAL_LITERAL() != null) {
String[] digits = DOT.split(decimalLiterals.get(0).getText());
if (Strings.isNullOrEmpty(digits[0]) || Integer.valueOf(digits[0]) == 0) {
// Set default value 10 according mysql engine
length = 10;
} else {
length = parseLength(digits[0]);
}
} else {
length = parseLength(decimalLiterals.get(0).getText());
}
if (decimalLiterals.size() > 1) {
scale = Integer.valueOf(decimalLiterals.get(1).getText());
}
}
if (length != null) {
columnEditor.length(length);
}
if (scale != null) {
columnEditor.scale(scale);
}
} else if (dataTypeContext instanceof MySqlParser.CollectionDataTypeContext) {
MySqlParser.CollectionDataTypeContext collectionDataTypeContext =
(MySqlParser.CollectionDataTypeContext) dataTypeContext;
if (collectionDataTypeContext.charsetName() != null) {
charsetName = collectionDataTypeContext.charsetName().getText();
}
if (dataType.name().equalsIgnoreCase("SET")) {
// After DBZ-132, it will always be comma separated
int optionsSize =
collectionDataTypeContext.collectionOptions().collectionOption().size();
columnEditor.length(
Math.max(0, optionsSize * 2 - 1)); // number of options + number of commas
} else {
columnEditor.length(1);
}
}
String dataTypeName = dataType.name().toUpperCase();
if (dataTypeName.equals("ENUM") || dataTypeName.equals("SET")) {
// type expression has to be set, because the value converter needs to know the enum or
// set options
MySqlParser.CollectionDataTypeContext collectionDataTypeContext =
(MySqlParser.CollectionDataTypeContext) dataTypeContext;
List<String> collectionOptions =
collectionDataTypeContext.collectionOptions().collectionOption().stream()
.map(AntlrDdlParser::getText)
.collect(Collectors.toList());
columnEditor.type(dataTypeName);
columnEditor.enumValues(collectionOptions);
} else if (dataTypeName.equals("SERIAL")) {
// SERIAL is an alias for BIGINT UNSIGNED NOT NULL AUTO_INCREMENT UNIQUE
columnEditor.type("BIGINT UNSIGNED");
serialColumn();
} else {
columnEditor.type(dataTypeName);
}
int jdbcDataType = dataType.jdbcType();
columnEditor.jdbcType(jdbcDataType);
if (columnEditor.length() == -1) {
columnEditor.length((int) dataType.length());
}
if (!columnEditor.scale().isPresent() && dataType.scale() != Column.UNSET_INT_VALUE) {
columnEditor.scale(dataType.scale());
}
if (Types.NCHAR == jdbcDataType || Types.NVARCHAR == jdbcDataType) {
// NCHAR and NVARCHAR columns always uses utf8 as charset
columnEditor.charsetName("utf8");
} else {
columnEditor.charsetName(charsetName);
}
}