in flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/typeutils/DataTypeConverter.java [295:368]
public static RelDataType convertCalciteType(
RelDataTypeFactory typeFactory, DataType dataType) {
switch (dataType.getTypeRoot()) {
case BOOLEAN:
return typeFactory.createSqlType(SqlTypeName.BOOLEAN);
case TINYINT:
return typeFactory.createSqlType(SqlTypeName.TINYINT);
case SMALLINT:
return typeFactory.createSqlType(SqlTypeName.SMALLINT);
case INTEGER:
return typeFactory.createSqlType(SqlTypeName.INTEGER);
case BIGINT:
return typeFactory.createSqlType(SqlTypeName.BIGINT);
case DATE:
return typeFactory.createSqlType(SqlTypeName.DATE);
case TIME_WITHOUT_TIME_ZONE:
TimeType timeType = (TimeType) dataType;
return typeFactory.createSqlType(
SqlTypeName.TIME_WITH_LOCAL_TIME_ZONE, timeType.getPrecision());
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) dataType;
return typeFactory.createSqlType(
SqlTypeName.TIMESTAMP, timestampType.getPrecision());
case TIMESTAMP_WITH_TIME_ZONE:
// TODO: Bump Calcite to support its TIMESTAMP_TZ type via #FLINK-37123
throw new UnsupportedOperationException("Unsupported type: TIMESTAMP_TZ");
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType =
(LocalZonedTimestampType) dataType;
return typeFactory.createSqlType(
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
localZonedTimestampType.getPrecision());
case FLOAT:
return typeFactory.createSqlType(SqlTypeName.FLOAT);
case DOUBLE:
return typeFactory.createSqlType(SqlTypeName.DOUBLE);
case CHAR:
CharType charType = (CharType) dataType;
return typeFactory.createSqlType(SqlTypeName.CHAR, charType.getLength());
case VARCHAR:
VarCharType varCharType = (VarCharType) dataType;
return typeFactory.createSqlType(SqlTypeName.VARCHAR, varCharType.getLength());
case BINARY:
BinaryType binaryType = (BinaryType) dataType;
return typeFactory.createSqlType(SqlTypeName.BINARY, binaryType.getLength());
case VARBINARY:
VarBinaryType varBinaryType = (VarBinaryType) dataType;
return typeFactory.createSqlType(SqlTypeName.VARBINARY, varBinaryType.getLength());
case DECIMAL:
DecimalType decimalType = (DecimalType) dataType;
return typeFactory.createSqlType(
SqlTypeName.DECIMAL, decimalType.getPrecision(), decimalType.getScale());
case ROW:
List<RelDataType> dataTypes =
((RowType) dataType)
.getFieldTypes().stream()
.map((type) -> convertCalciteType(typeFactory, type))
.collect(Collectors.toList());
return typeFactory.createStructType(
dataTypes, ((RowType) dataType).getFieldNames());
case ARRAY:
DataType elementType = ((ArrayType) dataType).getElementType();
return typeFactory.createArrayType(
convertCalciteType(typeFactory, elementType), -1);
case MAP:
RelDataType keyType =
convertCalciteType(typeFactory, ((MapType) dataType).getKeyType());
RelDataType valueType =
convertCalciteType(typeFactory, ((MapType) dataType).getValueType());
return typeFactory.createMapType(keyType, valueType);
default:
throw new UnsupportedOperationException("Unsupported type: " + dataType);
}
}