public static DataType toFlinkType()

in flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java [122:199]


  public static DataType toFlinkType(Type gravitinoType) {
    switch (gravitinoType.name()) {
      case DOUBLE:
        return DataTypes.DOUBLE();
      case STRING:
        return DataTypes.STRING();
      case INTEGER:
        return DataTypes.INT();
      case LONG:
        return DataTypes.BIGINT();
      case FLOAT:
        return DataTypes.FLOAT();
      case SHORT:
        return DataTypes.SMALLINT();
      case DECIMAL:
        Types.DecimalType decimalType = (Types.DecimalType) gravitinoType;
        return DataTypes.DECIMAL(decimalType.precision(), decimalType.scale());
      case VARCHAR:
        Types.VarCharType varCharType = (Types.VarCharType) gravitinoType;
        return DataTypes.VARCHAR(varCharType.length());
      case FIXED:
        Types.FixedType fixedType = (Types.FixedType) gravitinoType;
        return DataTypes.BINARY(fixedType.length());
      case FIXEDCHAR:
        Types.FixedCharType charType = (Types.FixedCharType) gravitinoType;
        return DataTypes.CHAR(charType.length());
      case BINARY:
        return DataTypes.BYTES();
      case BYTE:
        return DataTypes.TINYINT();
      case BOOLEAN:
        return DataTypes.BOOLEAN();
      case DATE:
        return DataTypes.DATE();
      case TIMESTAMP:
        Types.TimestampType timestampType = (Types.TimestampType) gravitinoType;
        if (timestampType.hasTimeZone()) {
          return DataTypes.TIMESTAMP_LTZ();
        } else {
          return DataTypes.TIMESTAMP();
        }
      case LIST:
        Types.ListType listType = (Types.ListType) gravitinoType;
        return DataTypes.ARRAY(
            nullable(toFlinkType(listType.elementType()), listType.elementNullable()));
      case MAP:
        Types.MapType mapType = (Types.MapType) gravitinoType;
        return DataTypes.MAP(
            toFlinkType(mapType.keyType()),
            nullable(toFlinkType(mapType.valueType()), mapType.valueNullable()));
      case STRUCT:
        Types.StructType structType = (Types.StructType) gravitinoType;
        List<DataTypes.Field> fields =
            Arrays.stream(structType.fields())
                .map(
                    f -> {
                      if (f.comment() == null) {
                        return DataTypes.FIELD(
                            f.name(), nullable(toFlinkType(f.type()), f.nullable()));
                      } else {
                        return DataTypes.FIELD(
                            f.name(), nullable(toFlinkType(f.type()), f.nullable()), f.comment());
                      }
                    })
                .collect(Collectors.toList());
        return DataTypes.ROW(fields);
      case NULL:
        return DataTypes.NULL();
      case TIME:
        return DataTypes.TIME();
      case INTERVAL_YEAR:
        return DataTypes.INTERVAL(DataTypes.YEAR());
      case INTERVAL_DAY:
        return DataTypes.INTERVAL(DataTypes.DAY());
      default:
        throw new UnsupportedOperationException("Not support " + gravitinoType.toString());
    }
  }