public static TypeInformation convertParquetTypeToTypeInfo()

in hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java [107:347]


  public static TypeInformation<?> convertParquetTypeToTypeInfo(final Type fieldType) {
    TypeInformation<?> typeInfo;
    if (fieldType.isPrimitive()) {
      OriginalType originalType = fieldType.getOriginalType();
      PrimitiveType primitiveType = fieldType.asPrimitiveType();
      switch (primitiveType.getPrimitiveTypeName()) {
        case BINARY:
          if (originalType != null) {
            switch (originalType) {
              case DECIMAL:
                typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
                break;
              case UTF8:
              case ENUM:
              case JSON:
              case BSON:
                typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
                break;
              default:
                throw new UnsupportedOperationException(
                    "Unsupported original type : "
                        + originalType.name()
                        + " for primitive type BINARY");
            }
          } else {
            typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
          }
          break;
        case BOOLEAN:
          typeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO;
          break;
        case INT32:
          if (originalType != null) {
            switch (originalType) {
              case TIME_MICROS:
              case TIME_MILLIS:
                typeInfo = SqlTimeTypeInfo.TIME;
                break;
              case TIMESTAMP_MICROS:
              case TIMESTAMP_MILLIS:
                typeInfo = SqlTimeTypeInfo.TIMESTAMP;
                break;
              case DATE:
                typeInfo = SqlTimeTypeInfo.DATE;
                break;
              case UINT_8:
              case UINT_16:
              case UINT_32:
                typeInfo = BasicTypeInfo.INT_TYPE_INFO;
                break;
              case INT_8:
                typeInfo = org.apache.flink.api.common.typeinfo.Types.BYTE;
                break;
              case INT_16:
                typeInfo = org.apache.flink.api.common.typeinfo.Types.SHORT;
                break;
              case INT_32:
                typeInfo = BasicTypeInfo.INT_TYPE_INFO;
                break;
              default:
                throw new UnsupportedOperationException(
                    "Unsupported original type : "
                        + originalType.name()
                        + " for primitive type INT32");
            }
          } else {
            typeInfo = BasicTypeInfo.INT_TYPE_INFO;
          }
          break;
        case INT64:
          if (originalType != null) {
            switch (originalType) {
              case TIME_MICROS:
                typeInfo = SqlTimeTypeInfo.TIME;
                break;
              case TIMESTAMP_MICROS:
              case TIMESTAMP_MILLIS:
                typeInfo = SqlTimeTypeInfo.TIMESTAMP;
                break;
              case INT_64:
              case DECIMAL:
                typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
                break;
              default:
                throw new UnsupportedOperationException(
                    "Unsupported original type : "
                        + originalType.name()
                        + " for primitive type INT64");
            }
          } else {
            typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
          }
          break;
        case INT96:
          // It stores a timestamp type data, we read it as millisecond
          typeInfo = SqlTimeTypeInfo.TIMESTAMP;
          break;
        case FLOAT:
          typeInfo = BasicTypeInfo.FLOAT_TYPE_INFO;
          break;
        case DOUBLE:
          typeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO;
          break;
        case FIXED_LEN_BYTE_ARRAY:
          if (originalType != null) {
            switch (originalType) {
              case DECIMAL:
                typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
                break;
              default:
                throw new UnsupportedOperationException(
                    "Unsupported original type : "
                        + originalType.name()
                        + " for primitive type FIXED_LEN_BYTE_ARRAY");
            }
          } else {
            typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
          }
          break;
        default:
          throw new UnsupportedOperationException("Unsupported schema: " + fieldType);
      }
    } else {
      GroupType parquetGroupType = fieldType.asGroupType();
      OriginalType originalType = parquetGroupType.getOriginalType();
      if (originalType != null) {
        switch (originalType) {
          case LIST:
            if (parquetGroupType.getFieldCount() != 1) {
              throw new UnsupportedOperationException(
                  "Invalid list type " + parquetGroupType);
            }
            Type repeatedType = parquetGroupType.getType(0);
            if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) {
              throw new UnsupportedOperationException(
                  "Invalid list type " + parquetGroupType);
            }

            if (repeatedType.isPrimitive()) {
              typeInfo = convertParquetPrimitiveListToFlinkArray(repeatedType);
            } else {
              // Backward-compatibility element group name can be any string
              // (element/array/other)
              GroupType elementType = repeatedType.asGroupType();
              // If the repeated field is a group with multiple fields, then its type
              // is the element
              // type and elements are required.
              if (elementType.getFieldCount() > 1) {

                for (Type type : elementType.getFields()) {
                  if (!type.isRepetition(Type.Repetition.REQUIRED)) {
                    throw new UnsupportedOperationException(
                        String.format(
                            "List field [%s] in List [%s] has to be required. ",
                            type.toString(), fieldType.getName()));
                  }
                }
                typeInfo =
                    ObjectArrayTypeInfo.getInfoFor(
                        convertParquetTypeToTypeInfo(elementType));
              } else {
                Type internalType = elementType.getType(0);
                if (internalType.isPrimitive()) {
                  typeInfo =
                      convertParquetPrimitiveListToFlinkArray(internalType);
                } else {
                  // No need to do special process for group named array and tuple
                  GroupType tupleGroup = internalType.asGroupType();
                  if (tupleGroup.getFieldCount() == 1
                      && tupleGroup
                      .getFields()
                      .get(0)
                      .isRepetition(Type.Repetition.REQUIRED)) {
                    typeInfo =
                        ObjectArrayTypeInfo.getInfoFor(
                            convertParquetTypeToTypeInfo(internalType));
                  } else {
                    throw new UnsupportedOperationException(
                        String.format(
                            "Unrecgonized List schema [%s] according to Parquet"
                                + " standard",
                            parquetGroupType.toString()));
                  }
                }
              }
            }
            break;

          case MAP_KEY_VALUE:
          case MAP:
            // The outer-most level must be a group annotated with MAP
            // that contains a single field named key_value
            if (parquetGroupType.getFieldCount() != 1
                || parquetGroupType.getType(0).isPrimitive()) {
              throw new UnsupportedOperationException(
                  "Invalid map type " + parquetGroupType);
            }

            // The middle level  must be a repeated group with a key field for map keys
            // and, optionally, a value field for map values. But we can't enforce two
            // strict condition here
            // the schema generated by Parquet lib doesn't contain LogicalType
            // ! mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE)
            GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
            if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED)
                || mapKeyValType.getFieldCount() != 2) {
              throw new UnsupportedOperationException(
                  "The middle level of Map should be single field named key_value. Invalid map type "
                      + parquetGroupType);
            }

            Type keyType = mapKeyValType.getType(0);

            // The key field encodes the map's key type. This field must have repetition
            // required and
            // must always be present.
            if (!keyType.isPrimitive()
                || !keyType.isRepetition(Type.Repetition.REQUIRED)
                || !keyType.asPrimitiveType()
                .getPrimitiveTypeName()
                .equals(PrimitiveType.PrimitiveTypeName.BINARY)
                || !keyType.getOriginalType().equals(OriginalType.UTF8)) {
              throw new IllegalArgumentException(
                  "Map key type must be required binary (UTF8): " + keyType);
            }

            Type valueType = mapKeyValType.getType(1);
            return new MapTypeInfo<>(
                BasicTypeInfo.STRING_TYPE_INFO,
                convertParquetTypeToTypeInfo(valueType));
          default:
            throw new UnsupportedOperationException("Unsupported schema: " + fieldType);
        }
      } else {
        // if no original type than it is a record
        return convertFields(parquetGroupType.getFields());
      }
    }

    return typeInfo;
  }