in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/util/DataTypeConverterUtils.java [138:195]
public static FlinkDataType convertorToFlinkDataType(DataType dataType) {
LogicalType logicalType = dataType.getLogicalType();
boolean isNullable = dataType.getLogicalType().isNullable();
String typeName = logicalType.getTypeRoot().name().toLowerCase();
Integer precision = null;
Integer scale = null;
switch (logicalType.getTypeRoot()) {
case CHAR:
case VARCHAR:
if (logicalType instanceof CharType) {
precision = ((CharType) logicalType).getLength();
} else if (logicalType instanceof VarCharType) {
precision = ((VarCharType) logicalType).getLength();
}
break;
case DECIMAL:
if (logicalType instanceof DecimalType) {
precision = ((DecimalType) logicalType).getPrecision();
scale = ((DecimalType) logicalType).getScale();
}
break;
case TINYINT:
case SMALLINT:
case INTEGER:
case BIGINT:
case FLOAT:
case DOUBLE:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
case BOOLEAN:
case BINARY:
case VARBINARY:
case ARRAY:
case MULTISET:
case MAP:
case ROW:
case RAW:
case NULL:
case SYMBOL:
case UNRESOLVED:
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
case TIMESTAMP_WITH_TIME_ZONE:
case DISTINCT_TYPE:
case STRUCTURED_TYPE:
// case JSON:
// case UUID:
// These types do not have precision or scale
break;
default:
throw new IllegalArgumentException("Unsupported type: " + logicalType);
}
return new FlinkDataType(typeName, isNullable, precision, scale);
}