private static Type convertField()

in hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java [362:537]


  private static Type convertField(
      String fieldName,
      TypeInformation<?> typeInfo,
      Type.Repetition inheritRepetition,
      boolean legacyMode) {
    Type fieldType = null;

    Type.Repetition repetition =
        inheritRepetition == null ? Type.Repetition.OPTIONAL : inheritRepetition;
    if (typeInfo instanceof BasicTypeInfo) {
      BasicTypeInfo basicTypeInfo = (BasicTypeInfo) typeInfo;
      if (basicTypeInfo.equals(BasicTypeInfo.BIG_DEC_TYPE_INFO)
          || basicTypeInfo.equals(BasicTypeInfo.BIG_INT_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
                .as(OriginalType.DECIMAL)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.INT_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
                .as(OriginalType.INT_32)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.DOUBLE_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.FLOAT_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, repetition)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.LONG_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
                .as(OriginalType.INT_64)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.SHORT_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
                .as(OriginalType.INT_16)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.BYTE_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
                .as(OriginalType.INT_8)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.CHAR_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
                .as(OriginalType.UTF8)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition)
                .named(fieldName);
      } else if (basicTypeInfo.equals(BasicTypeInfo.DATE_TYPE_INFO)
          || basicTypeInfo.equals(BasicTypeInfo.STRING_TYPE_INFO)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
                .as(OriginalType.UTF8)
                .named(fieldName);
      }
    } else if (typeInfo instanceof MapTypeInfo) {
      MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;

      if (mapTypeInfo.getKeyTypeInfo().equals(BasicTypeInfo.STRING_TYPE_INFO)) {
        fieldType =
            Types.map(repetition)
                .value(
                    convertField(
                        MAP_VALUE,
                        mapTypeInfo.getValueTypeInfo(),
                        Type.Repetition.OPTIONAL,
                        legacyMode))
                .named(fieldName);
      } else {
        throw new UnsupportedOperationException(
            String.format(
                "Can not convert Flink MapTypeInfo %s to Parquet"
                    + " Map type as key has to be String",
                typeInfo));
      }
    } else if (typeInfo instanceof ObjectArrayTypeInfo) {
      ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) typeInfo;

      // Get all required sub fields
      GroupType componentGroup =
          (GroupType)
              convertField(
                  LIST_ELEMENT,
                  objectArrayTypeInfo.getComponentInfo(),
                  Type.Repetition.REQUIRED,
                  legacyMode);

      GroupType elementGroup = Types.repeatedGroup().named(LIST_ELEMENT);
      elementGroup = elementGroup.withNewFields(componentGroup.getFields());
      fieldType =
          Types.buildGroup(repetition)
              .addField(elementGroup)
              .as(OriginalType.LIST)
              .named(fieldName);
    } else if (typeInfo instanceof BasicArrayTypeInfo) {
      BasicArrayTypeInfo basicArrayType = (BasicArrayTypeInfo) typeInfo;

      if (legacyMode) {

        // Add extra layer of Group according to Parquet's standard
        Type listGroup =
            Types.repeatedGroup()
                .addField(
                    convertField(
                        LIST_ELEMENT,
                        basicArrayType.getComponentInfo(),
                        Type.Repetition.REQUIRED,
                        legacyMode))
                .named(LIST_GROUP_NAME);

        fieldType =
            Types.buildGroup(repetition)
                .addField(listGroup)
                .as(OriginalType.LIST)
                .named(fieldName);
      } else {
        PrimitiveType primitiveTyp =
            convertField(
                fieldName,
                basicArrayType.getComponentInfo(),
                Type.Repetition.REQUIRED,
                legacyMode)
                .asPrimitiveType();
        fieldType =
            Types.buildGroup(repetition)
                .repeated(primitiveTyp.getPrimitiveTypeName())
                .as(primitiveTyp.getOriginalType())
                .named(LIST_ARRAY_TYPE)
                .as(OriginalType.LIST)
                .named(fieldName);
      }
    } else if (typeInfo instanceof SqlTimeTypeInfo) {
      if (typeInfo.equals(SqlTimeTypeInfo.DATE)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
                .as(OriginalType.DATE)
                .named(fieldName);
      } else if (typeInfo.equals(SqlTimeTypeInfo.TIME)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
                .as(OriginalType.TIME_MILLIS)
                .named(fieldName);
      } else if (typeInfo.equals(SqlTimeTypeInfo.TIMESTAMP)) {
        fieldType =
            Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
                .as(OriginalType.TIMESTAMP_MILLIS)
                .named(fieldName);
      } else {
        throw new UnsupportedOperationException(
            "Unsupported SqlTimeTypeInfo " + typeInfo.toString());
      }

    } else {
      RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
      List<Type> types = new ArrayList<>();
      String[] fieldNames = rowTypeInfo.getFieldNames();
      TypeInformation<?>[] fieldTypes = rowTypeInfo.getFieldTypes();
      for (int i = 0; i < rowTypeInfo.getArity(); i++) {
        types.add(convertField(fieldNames[i], fieldTypes[i], repetition, legacyMode));
      }

      if (fieldName == null) {
        fieldType = new MessageType(MESSAGE_ROOT, types);
      } else {
        fieldType = new GroupType(repetition, fieldName, types);
      }
    }

    return fieldType;
  }