in flink-connector-hive/src/main/java/org/apache/flink/table/planner/utils/FlinkTypeUtils.java [57:166]
public static LogicalType toLogicalType(RelDataType relDataType) {
SqlTypeName sqlTypeName = relDataType.getSqlTypeName();
boolean isNullable = relDataType.isNullable();
if (YEAR_INTERVAL_TYPES.contains(sqlTypeName)) {
return DataTypes.INTERVAL(DataTypes.MONTH()).getLogicalType().copy(isNullable);
} else if (DAY_INTERVAL_TYPES.contains(sqlTypeName)) {
if (relDataType.getPrecision() > 3) {
throw new TableException(
"DAY_INTERVAL_TYPES precision is not supported:"
+ relDataType.getPrecision());
}
return DataTypes.INTERVAL(DataTypes.SECOND(3)).getLogicalType().copy(isNullable);
}
LogicalType logicalType;
switch (sqlTypeName) {
case BOOLEAN:
logicalType = new BooleanType();
break;
case TINYINT:
logicalType = new TinyIntType();
break;
case SMALLINT:
logicalType = new SmallIntType();
break;
case INTEGER:
logicalType = new IntType();
break;
case BIGINT:
logicalType = new BigIntType();
break;
case FLOAT:
logicalType = new FloatType();
break;
case DOUBLE:
logicalType = new DoubleType();
break;
case CHAR:
logicalType =
relDataType.getPrecision() == 0
? CharType.ofEmptyLiteral()
: new CharType(relDataType.getPrecision());
break;
case VARCHAR:
logicalType =
relDataType.getPrecision() == 0
? VarCharType.ofEmptyLiteral()
: new VarCharType(relDataType.getPrecision());
break;
case BINARY:
logicalType =
relDataType.getPrecision() == 0
? BinaryType.ofEmptyLiteral()
: new BinaryType(relDataType.getPrecision());
break;
case VARBINARY:
logicalType =
relDataType.getPrecision() == 0
? VarBinaryType.ofEmptyLiteral()
: new VarBinaryType(relDataType.getPrecision());
break;
case DECIMAL:
logicalType = new DecimalType(relDataType.getPrecision(), relDataType.getScale());
break;
case TIMESTAMP:
logicalType = new TimestampType(relDataType.getPrecision());
break;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
logicalType = new LocalZonedTimestampType(relDataType.getPrecision());
break;
case DATE:
logicalType = new DateType();
break;
case TIME:
logicalType = new TimeType();
break;
case NULL:
logicalType = new NullType();
break;
case SYMBOL:
logicalType = new SymbolType();
break;
case ROW:
{
Preconditions.checkArgument(relDataType.isStruct());
logicalType =
RowType.of(
relDataType.getFieldList().stream()
.map(fieldType -> toLogicalType(fieldType.getType()))
.toArray(LogicalType[]::new),
relDataType.getFieldNames().toArray(new String[0]));
break;
}
case MULTISET:
logicalType = new MultisetType(toLogicalType(relDataType.getComponentType()));
break;
case MAP:
logicalType =
new MapType(
toLogicalType(relDataType.getKeyType()),
toLogicalType(relDataType.getValueType()));
break;
case ARRAY:
logicalType = new ArrayType(toLogicalType(relDataType.getComponentType()));
break;
default:
// shouldn't arrive in here
throw new TableException("Type is not supported: " + relDataType.getSqlTypeName());
}
return logicalType.copy(isNullable);
}