in flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java [129:293]
public static RelDataType convertCalciteRelDataType(
RelDataTypeFactory typeFactory, List<Column> columns) {
RelDataTypeFactory.Builder fieldInfoBuilder = typeFactory.builder();
for (Column column : columns) {
switch (column.getType().getTypeRoot()) {
case BOOLEAN:
BooleanType booleanType = (BooleanType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.BOOLEAN)
.nullable(booleanType.isNullable());
break;
case TINYINT:
TinyIntType tinyIntType = (TinyIntType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.TINYINT)
.nullable(tinyIntType.isNullable());
break;
case SMALLINT:
SmallIntType smallIntType = (SmallIntType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.SMALLINT)
.nullable(smallIntType.isNullable());
break;
case INTEGER:
IntType intType = (IntType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.INTEGER)
.nullable(intType.isNullable());
break;
case BIGINT:
BigIntType bigIntType = (BigIntType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.BIGINT)
.nullable(bigIntType.isNullable());
break;
case DATE:
DateType dataType = (DateType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.DATE)
.nullable(dataType.isNullable());
break;
case TIME_WITHOUT_TIME_ZONE:
TimeType timeType = (TimeType) column.getType();
fieldInfoBuilder
.add(
column.getName(),
SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE,
timeType.getPrecision())
.nullable(timeType.isNullable());
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) column.getType();
fieldInfoBuilder
.add(
column.getName(),
SqlTypeName.TIMESTAMP,
timestampType.getPrecision())
.nullable(timestampType.isNullable());
break;
case TIMESTAMP_WITH_TIME_ZONE:
ZonedTimestampType zonedTimestampType = (ZonedTimestampType) column.getType();
fieldInfoBuilder
.add(
column.getName(),
SqlTypeName.TIMESTAMP,
zonedTimestampType.getPrecision())
.nullable(zonedTimestampType.isNullable());
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType =
(LocalZonedTimestampType) column.getType();
fieldInfoBuilder
.add(
column.getName(),
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
localZonedTimestampType.getPrecision())
.nullable(localZonedTimestampType.isNullable());
break;
case FLOAT:
FloatType floatType = (FloatType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.FLOAT)
.nullable(floatType.isNullable());
break;
case DOUBLE:
DoubleType doubleType = (DoubleType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.DOUBLE)
.nullable(doubleType.isNullable());
break;
case CHAR:
CharType charType = (CharType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.CHAR, charType.getLength())
.nullable(charType.isNullable());
break;
case VARCHAR:
VarCharType varCharType = (VarCharType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.VARCHAR, varCharType.getLength())
.nullable(varCharType.isNullable());
break;
case BINARY:
BinaryType binaryType = (BinaryType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.BINARY, binaryType.getLength())
.nullable(binaryType.isNullable());
break;
case VARBINARY:
VarBinaryType varBinaryType = (VarBinaryType) column.getType();
fieldInfoBuilder
.add(column.getName(), SqlTypeName.VARBINARY, varBinaryType.getLength())
.nullable(varBinaryType.isNullable());
break;
case DECIMAL:
DecimalType decimalType = (DecimalType) column.getType();
fieldInfoBuilder
.add(
column.getName(),
SqlTypeName.DECIMAL,
decimalType.getPrecision(),
decimalType.getScale())
.nullable(decimalType.isNullable());
break;
case ROW:
List<RelDataType> dataTypes =
((RowType) column.getType())
.getFieldTypes().stream()
.map((type) -> convertCalciteType(typeFactory, type))
.collect(Collectors.toList());
fieldInfoBuilder
.add(
column.getName(),
typeFactory.createStructType(
dataTypes,
((RowType) column.getType()).getFieldNames()))
.nullable(true);
break;
case ARRAY:
DataType elementType = ((ArrayType) column.getType()).getElementType();
fieldInfoBuilder
.add(
column.getName(),
typeFactory.createArrayType(
convertCalciteType(typeFactory, elementType), -1))
.nullable(true);
break;
case MAP:
RelDataType keyType =
convertCalciteType(
typeFactory, ((MapType) column.getType()).getKeyType());
RelDataType valueType =
convertCalciteType(
typeFactory, ((MapType) column.getType()).getValueType());
fieldInfoBuilder
.add(column.getName(), typeFactory.createMapType(keyType, valueType))
.nullable(true);
break;
default:
throw new UnsupportedOperationException(
"Unsupported type: " + column.getType());
}
}
return fieldInfoBuilder.build();
}