in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala [72:192]
def createFieldTypeFromLogicalType(t: LogicalType): RelDataType = {
def newRelDataType(): RelDataType = t.getTypeRoot match {
case LogicalTypeRoot.NULL => createSqlType(NULL)
case LogicalTypeRoot.BOOLEAN => createSqlType(BOOLEAN)
case LogicalTypeRoot.TINYINT => createSqlType(TINYINT)
case LogicalTypeRoot.SMALLINT => createSqlType(SMALLINT)
case LogicalTypeRoot.INTEGER => createSqlType(INTEGER)
case LogicalTypeRoot.BIGINT => createSqlType(BIGINT)
case LogicalTypeRoot.FLOAT => createSqlType(FLOAT)
case LogicalTypeRoot.DOUBLE => createSqlType(DOUBLE)
case LogicalTypeRoot.VARCHAR => createSqlType(VARCHAR, t.asInstanceOf[VarCharType].getLength)
case LogicalTypeRoot.CHAR => createSqlType(CHAR, t.asInstanceOf[CharType].getLength)
// temporal types
case LogicalTypeRoot.DATE => createSqlType(DATE)
case LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE => createSqlType(TIME)
// interval types
case LogicalTypeRoot.INTERVAL_YEAR_MONTH =>
createSqlIntervalType(
new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
case LogicalTypeRoot.INTERVAL_DAY_TIME =>
createSqlIntervalType(
new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))
case LogicalTypeRoot.BINARY => createSqlType(BINARY, t.asInstanceOf[BinaryType].getLength)
case LogicalTypeRoot.VARBINARY =>
createSqlType(VARBINARY, t.asInstanceOf[VarBinaryType].getLength)
case LogicalTypeRoot.DECIMAL =>
t match {
case decimalType: DecimalType =>
createSqlType(DECIMAL, decimalType.getPrecision, decimalType.getScale)
case legacyType: LegacyTypeInformationType[_]
if legacyType.getTypeInformation == BasicTypeInfo.BIG_DEC_TYPE_INFO =>
createSqlType(DECIMAL, 38, 18)
}
case LogicalTypeRoot.ROW =>
val rowType = t.asInstanceOf[RowType]
buildStructType(
rowType.getFieldNames,
rowType.getChildren,
// fields are not expanded in "SELECT *"
StructKind.PEEK_FIELDS_NO_EXPAND)
case LogicalTypeRoot.STRUCTURED_TYPE =>
t match {
case structuredType: StructuredType => StructuredRelDataType.create(this, structuredType)
case legacyTypeInformationType: LegacyTypeInformationType[_] =>
createFieldTypeFromLogicalType(
PlannerTypeUtils.removeLegacyTypes(legacyTypeInformationType))
}
case LogicalTypeRoot.ARRAY =>
val arrayType = t.asInstanceOf[ArrayType]
createArrayType(createFieldTypeFromLogicalType(arrayType.getElementType), -1)
case LogicalTypeRoot.MAP =>
val mapType = t.asInstanceOf[MapType]
createMapType(
createFieldTypeFromLogicalType(mapType.getKeyType),
createFieldTypeFromLogicalType(mapType.getValueType))
case LogicalTypeRoot.MULTISET =>
val multisetType = t.asInstanceOf[MultisetType]
createMultisetType(createFieldTypeFromLogicalType(multisetType.getElementType), -1)
case LogicalTypeRoot.RAW =>
t match {
case rawType: RawType[_] =>
new RawRelDataType(rawType)
case genericType: TypeInformationRawType[_] =>
new GenericRelDataType(genericType, true, getTypeSystem)
case legacyType: LegacyTypeInformationType[_] =>
createFieldTypeFromLogicalType(PlannerTypeUtils.removeLegacyTypes(legacyType))
}
case LogicalTypeRoot.SYMBOL =>
createSqlType(SqlTypeName.SYMBOL)
case LogicalTypeRoot.DESCRIPTOR =>
createSqlType(SqlTypeName.COLUMN_LIST)
case _ @t =>
throw new TableException(s"Type is not supported: $t")
}
// Kind in TimestampType do not affect the hashcode and equals, So we can't put it to seenTypes
val relType = t.getTypeRoot match {
case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
val timestampType = t.asInstanceOf[TimestampType]
timestampType.getKind match {
case TimestampKind.ROWTIME => createRowtimeIndicatorType(t.isNullable, false)
case TimestampKind.REGULAR => createSqlType(TIMESTAMP, timestampType.getPrecision)
case TimestampKind.PROCTIME =>
throw new TableException(
s"Processing time indicator only supports" +
s" LocalZonedTimestampType, but actual is TimestampType." +
s" This is a bug in planner, please file an issue.")
}
case LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
val lzTs = t.asInstanceOf[LocalZonedTimestampType]
lzTs.getKind match {
case TimestampKind.PROCTIME => createProctimeIndicatorType(t.isNullable)
case TimestampKind.ROWTIME => createRowtimeIndicatorType(t.isNullable, true)
case TimestampKind.REGULAR =>
createSqlType(TIMESTAMP_WITH_LOCAL_TIME_ZONE, lzTs.getPrecision)
}
case _ =>
seenTypes.get(t) match {
case Some(retType: RelDataType) => retType
case None =>
val refType = newRelDataType()
seenTypes.put(t, refType)
refType
}
}
createTypeWithNullability(relType, t.isNullable)
}