in flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java [154:197]
private static DataType toFlinkPrimitiveType(PrimitiveTypeInfo hiveType) {
checkNotNull(hiveType, "hiveType cannot be null");
switch (hiveType.getPrimitiveCategory()) {
case CHAR:
return DataTypes.CHAR(((CharTypeInfo) hiveType).getLength());
case VARCHAR:
return DataTypes.VARCHAR(((VarcharTypeInfo) hiveType).getLength());
case STRING:
return DataTypes.STRING();
case BOOLEAN:
return DataTypes.BOOLEAN();
case BYTE:
return DataTypes.TINYINT();
case SHORT:
return DataTypes.SMALLINT();
case INT:
return DataTypes.INT();
case LONG:
return DataTypes.BIGINT();
case FLOAT:
return DataTypes.FLOAT();
case DOUBLE:
return DataTypes.DOUBLE();
case DATE:
return DataTypes.DATE();
case TIMESTAMP:
return DataTypes.TIMESTAMP(9);
case BINARY:
return DataTypes.BYTES();
case DECIMAL:
DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) hiveType;
return DataTypes.DECIMAL(
decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
case INTERVAL_YEAR_MONTH:
return DataTypes.INTERVAL(DataTypes.MONTH());
case INTERVAL_DAY_TIME:
return DataTypes.INTERVAL(DataTypes.SECOND(3));
default:
throw new UnsupportedOperationException(
String.format(
"Flink doesn't support Hive primitive type %s yet", hiveType));
}
}