in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala [547:662]
def toLogicalType(relDataType: RelDataType): LogicalType = {
val logicalType = relDataType.getSqlTypeName match {
case BOOLEAN => new BooleanType()
case TINYINT => new TinyIntType()
case SMALLINT => new SmallIntType()
case INTEGER => new IntType()
case BIGINT => new BigIntType()
case FLOAT => new FloatType()
case DOUBLE => new DoubleType()
case CHAR =>
if (relDataType.getPrecision == 0) {
CharType.ofEmptyLiteral
} else {
new CharType(relDataType.getPrecision)
}
case VARCHAR =>
if (relDataType.getPrecision == 0) {
VarCharType.ofEmptyLiteral
} else {
new VarCharType(relDataType.getPrecision)
}
case BINARY =>
if (relDataType.getPrecision == 0) {
BinaryType.ofEmptyLiteral
} else {
new BinaryType(relDataType.getPrecision)
}
case VARBINARY =>
if (relDataType.getPrecision == 0) {
VarBinaryType.ofEmptyLiteral
} else {
new VarBinaryType(relDataType.getPrecision)
}
case DECIMAL => new DecimalType(relDataType.getPrecision, relDataType.getScale)
// time indicators
case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
if (indicator.isEventTime) {
new TimestampType(true, TimestampKind.ROWTIME, 3)
} else {
throw new TableException(
s"Processing time indicator only supports" +
s" LocalZonedTimestampType, but actual is TimestampType." +
s" This is a bug in planner, please file an issue.")
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
if (indicator.isEventTime) {
new LocalZonedTimestampType(true, TimestampKind.ROWTIME, 3)
} else {
new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3)
}
// temporal types
case DATE => new DateType()
case TIME =>
if (relDataType.getPrecision > 3) {
throw new TableException(s"TIME precision is not supported: ${relDataType.getPrecision}")
}
// the planner supports precision 3, but for consistency with old planner, we set it to 0.
new TimeType()
case TIMESTAMP =>
new TimestampType(relDataType.getPrecision)
case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
new LocalZonedTimestampType(relDataType.getPrecision)
case typeName if YEAR_INTERVAL_TYPES.contains(typeName) =>
DataTypes.INTERVAL(DataTypes.MONTH).getLogicalType
case typeName if DAY_INTERVAL_TYPES.contains(typeName) =>
if (relDataType.getPrecision > 3) {
throw new TableException(
s"DAY_INTERVAL_TYPES precision is not supported: ${relDataType.getPrecision}")
}
DataTypes.INTERVAL(DataTypes.SECOND(3)).getLogicalType
case NULL =>
new NullType()
case SYMBOL =>
new SymbolType()
case COLUMN_LIST =>
new DescriptorType()
// extract encapsulated Type
case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
genericRelDataType.genericType
case ROW if relDataType.isInstanceOf[RelRecordType] =>
toLogicalRowType(relDataType)
case STRUCTURED if relDataType.isInstanceOf[StructuredRelDataType] =>
relDataType.asInstanceOf[StructuredRelDataType].getStructuredType
case MULTISET => new MultisetType(toLogicalType(relDataType.getComponentType))
case ARRAY => new ArrayType(toLogicalType(relDataType.getComponentType))
case MAP if relDataType.isInstanceOf[MapSqlType] =>
val mapRelDataType = relDataType.asInstanceOf[MapSqlType]
new MapType(
toLogicalType(mapRelDataType.getKeyType),
toLogicalType(mapRelDataType.getValueType))
// CURSOR for UDTF case, whose type info will never be used, just a placeholder
case CURSOR => new TypeInformationRawType[Nothing](new NothingTypeInfo)
case OTHER if relDataType.isInstanceOf[RawRelDataType] =>
relDataType.asInstanceOf[RawRelDataType].getRawType
case _ @t =>
throw new TableException(s"Type is not supported: $t")
}
logicalType.copy(relDataType.isNullable)
}