in flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/main/java/org/apache/flink/cdc/connectors/sqlserver/source/utils/SqlServerTypeUtils.java [47:85]
private static DataType convertFromColumn(Column column) {
switch (column.jdbcType()) {
case Types.CHAR:
case Types.VARCHAR:
case Types.NCHAR:
case Types.NVARCHAR:
case Types.STRUCT:
case Types.CLOB:
return DataTypes.STRING();
case Types.BLOB:
return DataTypes.BYTES();
case Types.INTEGER:
case Types.SMALLINT:
case Types.TINYINT:
return DataTypes.INT();
case Types.BIGINT:
return DataTypes.BIGINT();
case Types.FLOAT:
case Types.REAL:
case Types.DOUBLE:
case Types.NUMERIC:
case Types.DECIMAL:
return DataTypes.DECIMAL(column.length(), column.scale().orElse(0));
case Types.DATE:
return DataTypes.DATE();
case Types.TIMESTAMP:
case Types.TIMESTAMP_WITH_TIMEZONE:
return column.scale().isPresent()
? DataTypes.TIMESTAMP(column.scale().get())
: DataTypes.TIMESTAMP();
case Types.BOOLEAN:
return DataTypes.BOOLEAN();
default:
throw new UnsupportedOperationException(
String.format(
"Don't support SqlSever type '%s' yet, jdbcType:'%s'.",
column.typeName(), column.jdbcType()));
}
}