def createFieldTypeFromLogicalType()

in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala [72:192]


  def createFieldTypeFromLogicalType(t: LogicalType): RelDataType = {
    def newRelDataType(): RelDataType = t.getTypeRoot match {
      case LogicalTypeRoot.NULL => createSqlType(NULL)
      case LogicalTypeRoot.BOOLEAN => createSqlType(BOOLEAN)
      case LogicalTypeRoot.TINYINT => createSqlType(TINYINT)
      case LogicalTypeRoot.SMALLINT => createSqlType(SMALLINT)
      case LogicalTypeRoot.INTEGER => createSqlType(INTEGER)
      case LogicalTypeRoot.BIGINT => createSqlType(BIGINT)
      case LogicalTypeRoot.FLOAT => createSqlType(FLOAT)
      case LogicalTypeRoot.DOUBLE => createSqlType(DOUBLE)
      case LogicalTypeRoot.VARCHAR => createSqlType(VARCHAR, t.asInstanceOf[VarCharType].getLength)
      case LogicalTypeRoot.CHAR => createSqlType(CHAR, t.asInstanceOf[CharType].getLength)

      // temporal types
      case LogicalTypeRoot.DATE => createSqlType(DATE)
      case LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE => createSqlType(TIME)

      // interval types
      case LogicalTypeRoot.INTERVAL_YEAR_MONTH =>
        createSqlIntervalType(
          new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO))
      case LogicalTypeRoot.INTERVAL_DAY_TIME =>
        createSqlIntervalType(
          new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO))

      case LogicalTypeRoot.BINARY => createSqlType(BINARY, t.asInstanceOf[BinaryType].getLength)
      case LogicalTypeRoot.VARBINARY =>
        createSqlType(VARBINARY, t.asInstanceOf[VarBinaryType].getLength)

      case LogicalTypeRoot.DECIMAL =>
        t match {
          case decimalType: DecimalType =>
            createSqlType(DECIMAL, decimalType.getPrecision, decimalType.getScale)
          case legacyType: LegacyTypeInformationType[_]
              if legacyType.getTypeInformation == BasicTypeInfo.BIG_DEC_TYPE_INFO =>
            createSqlType(DECIMAL, 38, 18)
        }

      case LogicalTypeRoot.ROW =>
        val rowType = t.asInstanceOf[RowType]
        buildStructType(
          rowType.getFieldNames,
          rowType.getChildren,
          // fields are not expanded in "SELECT *"
          StructKind.PEEK_FIELDS_NO_EXPAND)

      case LogicalTypeRoot.STRUCTURED_TYPE =>
        t match {
          case structuredType: StructuredType => StructuredRelDataType.create(this, structuredType)
          case legacyTypeInformationType: LegacyTypeInformationType[_] =>
            createFieldTypeFromLogicalType(
              PlannerTypeUtils.removeLegacyTypes(legacyTypeInformationType))
        }

      case LogicalTypeRoot.ARRAY =>
        val arrayType = t.asInstanceOf[ArrayType]
        createArrayType(createFieldTypeFromLogicalType(arrayType.getElementType), -1)

      case LogicalTypeRoot.MAP =>
        val mapType = t.asInstanceOf[MapType]
        createMapType(
          createFieldTypeFromLogicalType(mapType.getKeyType),
          createFieldTypeFromLogicalType(mapType.getValueType))

      case LogicalTypeRoot.MULTISET =>
        val multisetType = t.asInstanceOf[MultisetType]
        createMultisetType(createFieldTypeFromLogicalType(multisetType.getElementType), -1)

      case LogicalTypeRoot.RAW =>
        t match {
          case rawType: RawType[_] =>
            new RawRelDataType(rawType)
          case genericType: TypeInformationRawType[_] =>
            new GenericRelDataType(genericType, true, getTypeSystem)
          case legacyType: LegacyTypeInformationType[_] =>
            createFieldTypeFromLogicalType(PlannerTypeUtils.removeLegacyTypes(legacyType))
        }

      case LogicalTypeRoot.SYMBOL =>
        createSqlType(SqlTypeName.SYMBOL)

      case LogicalTypeRoot.DESCRIPTOR =>
        createSqlType(SqlTypeName.COLUMN_LIST)

      case _ @t =>
        throw new TableException(s"Type is not supported: $t")
    }

    // Kind in TimestampType do not affect the hashcode and equals, So we can't put it to seenTypes
    val relType = t.getTypeRoot match {
      case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
        val timestampType = t.asInstanceOf[TimestampType]
        timestampType.getKind match {
          case TimestampKind.ROWTIME => createRowtimeIndicatorType(t.isNullable, false)
          case TimestampKind.REGULAR => createSqlType(TIMESTAMP, timestampType.getPrecision)
          case TimestampKind.PROCTIME =>
            throw new TableException(
              s"Processing time indicator only supports" +
                s" LocalZonedTimestampType, but actual is TimestampType." +
                s" This is a bug in planner, please file an issue.")
        }
      case LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
        val lzTs = t.asInstanceOf[LocalZonedTimestampType]
        lzTs.getKind match {
          case TimestampKind.PROCTIME => createProctimeIndicatorType(t.isNullable)
          case TimestampKind.ROWTIME => createRowtimeIndicatorType(t.isNullable, true)
          case TimestampKind.REGULAR =>
            createSqlType(TIMESTAMP_WITH_LOCAL_TIME_ZONE, lzTs.getPrecision)
        }
      case _ =>
        seenTypes.get(t) match {
          case Some(retType: RelDataType) => retType
          case None =>
            val refType = newRelDataType()
            seenTypes.put(t, refType)
            refType
        }
    }

    createTypeWithNullability(relType, t.isNullable)
  }