def toLogicalType()

in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala [547:662]


  def toLogicalType(relDataType: RelDataType): LogicalType = {
    val logicalType = relDataType.getSqlTypeName match {
      case BOOLEAN => new BooleanType()
      case TINYINT => new TinyIntType()
      case SMALLINT => new SmallIntType()
      case INTEGER => new IntType()
      case BIGINT => new BigIntType()
      case FLOAT => new FloatType()
      case DOUBLE => new DoubleType()
      case CHAR =>
        if (relDataType.getPrecision == 0) {
          CharType.ofEmptyLiteral
        } else {
          new CharType(relDataType.getPrecision)
        }
      case VARCHAR =>
        if (relDataType.getPrecision == 0) {
          VarCharType.ofEmptyLiteral
        } else {
          new VarCharType(relDataType.getPrecision)
        }
      case BINARY =>
        if (relDataType.getPrecision == 0) {
          BinaryType.ofEmptyLiteral
        } else {
          new BinaryType(relDataType.getPrecision)
        }
      case VARBINARY =>
        if (relDataType.getPrecision == 0) {
          VarBinaryType.ofEmptyLiteral
        } else {
          new VarBinaryType(relDataType.getPrecision)
        }
      case DECIMAL => new DecimalType(relDataType.getPrecision, relDataType.getScale)

      // time indicators
      case TIMESTAMP if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
        val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
        if (indicator.isEventTime) {
          new TimestampType(true, TimestampKind.ROWTIME, 3)
        } else {
          throw new TableException(
            s"Processing time indicator only supports" +
              s" LocalZonedTimestampType, but actual is TimestampType." +
              s" This is a bug in planner, please file an issue.")
        }
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE if relDataType.isInstanceOf[TimeIndicatorRelDataType] =>
        val indicator = relDataType.asInstanceOf[TimeIndicatorRelDataType]
        if (indicator.isEventTime) {
          new LocalZonedTimestampType(true, TimestampKind.ROWTIME, 3)
        } else {
          new LocalZonedTimestampType(true, TimestampKind.PROCTIME, 3)
        }

      // temporal types
      case DATE => new DateType()
      case TIME =>
        if (relDataType.getPrecision > 3) {
          throw new TableException(s"TIME precision is not supported: ${relDataType.getPrecision}")
        }
        // the planner supports precision 3, but for consistency with old planner, we set it to 0.
        new TimeType()
      case TIMESTAMP =>
        new TimestampType(relDataType.getPrecision)
      case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
        new LocalZonedTimestampType(relDataType.getPrecision)
      case typeName if YEAR_INTERVAL_TYPES.contains(typeName) =>
        DataTypes.INTERVAL(DataTypes.MONTH).getLogicalType
      case typeName if DAY_INTERVAL_TYPES.contains(typeName) =>
        if (relDataType.getPrecision > 3) {
          throw new TableException(
            s"DAY_INTERVAL_TYPES precision is not supported: ${relDataType.getPrecision}")
        }
        DataTypes.INTERVAL(DataTypes.SECOND(3)).getLogicalType

      case NULL =>
        new NullType()

      case SYMBOL =>
        new SymbolType()

      case COLUMN_LIST =>
        new DescriptorType()

      // extract encapsulated Type
      case ANY if relDataType.isInstanceOf[GenericRelDataType] =>
        val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType]
        genericRelDataType.genericType

      case ROW if relDataType.isInstanceOf[RelRecordType] =>
        toLogicalRowType(relDataType)

      case STRUCTURED if relDataType.isInstanceOf[StructuredRelDataType] =>
        relDataType.asInstanceOf[StructuredRelDataType].getStructuredType

      case MULTISET => new MultisetType(toLogicalType(relDataType.getComponentType))

      case ARRAY => new ArrayType(toLogicalType(relDataType.getComponentType))

      case MAP if relDataType.isInstanceOf[MapSqlType] =>
        val mapRelDataType = relDataType.asInstanceOf[MapSqlType]
        new MapType(
          toLogicalType(mapRelDataType.getKeyType),
          toLogicalType(mapRelDataType.getValueType))

      // CURSOR for UDTF case, whose type info will never be used, just a placeholder
      case CURSOR => new TypeInformationRawType[Nothing](new NothingTypeInfo)

      case OTHER if relDataType.isInstanceOf[RawRelDataType] =>
        relDataType.asInstanceOf[RawRelDataType].getRawType

      case _ @t =>
        throw new TableException(s"Type is not supported: $t")
    }
    logicalType.copy(relDataType.isNullable)
  }