gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java [303:491]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        default:
          throw new RuntimeException("Unsupported Repetition type");
      }
    }
  }

  public static Type.Repetition optionalOrRequired(JsonSchema jsonBaseSchema) {
    return jsonBaseSchema.isNullable() ? OPTIONAL : REQUIRED;
  }

  public static class ArrayConverter extends CollectionConverter {

    public ArrayConverter(JsonSchema arraySchema) {
      super(arraySchema, arraySchema.getElementTypeUsingKey(ARRAY_ITEMS_KEY), true);
    }

    @Override
    Object convertField(JsonElement value) {
      ParquetGroup array = new ParquetGroup((GroupType) schema());
      JsonElementConverter converter = this.elementConverter;
      for (JsonElement elem : (JsonArray) value) {
        array.add(ARRAY_KEY, converter.convert(elem));
      }
      return array;
    }

    @Override
    protected Type buildSchema() {
      List<Type> fields = new ArrayList<>();
      fields.add(0, this.elementConverter.schema());
      return new GroupType(optionalOrRequired(jsonSchema), this.jsonSchema.getColumnName(), fields);
    }

    @Override
    JsonSchema getElementSchema() {
      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType, true);
      jsonSchema.setColumnName(ARRAY_KEY);
      return jsonSchema;
    }
  }

  public static class EnumConverter extends CollectionConverter {
    private final HashSet<String> symbols = new HashSet<>();

    public EnumConverter(JsonSchema enumSchema) {
      super(enumSchema, STRING, false);
      JsonArray symbolsArray = enumSchema.getSymbols();
      symbolsArray.forEach(e -> symbols.add(e.getAsString()));
    }

    @Override
    Object convertField(JsonElement value) {
      if (symbols.contains(value.getAsString()) || (this.jsonSchema.isNullable() && value.isJsonNull())) {
        return this.elementConverter.convert(value);
      }
      throw new RuntimeException("Symbol " + value.getAsString() + " does not belong to set " + symbols.toString());
    }

    @Override
    protected Type buildSchema() {
      return this.elementConverter.schema();
    }

    @Override
    JsonSchema getElementSchema() {
      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING, this.jsonSchema.isNullable());
      jsonSchema.setColumnName(this.jsonSchema.getColumnName());
      return jsonSchema;
    }
  }

  public static class RecordConverter extends JsonElementConverter {

    private final HashMap<String, JsonElementConverter> converters;
    private final RecordType recordType;
    private final Type schema;

    public enum RecordType {
      ROOT, CHILD
    }

    public RecordConverter(JsonSchema recordSchema) {
      this(recordSchema, CHILD);
    }

    public RecordConverter(JsonSchema recordSchema, RecordType recordType) {
      super(recordSchema);
      this.converters = new HashMap<>();
      this.recordType = recordType;
      this.schema = buildSchema();
    }

    @Override
    Object convertField(JsonElement value) {
      ParquetGroup r1 = new ParquetGroup((GroupType) schema());
      JsonObject inputRecord = value.getAsJsonObject();
      for (Map.Entry<String, JsonElement> entry : inputRecord.entrySet()) {
        String key = entry.getKey();
        JsonElementConverter converter = this.converters.get(key);
        Object convertedValue = converter.convert(entry.getValue());
        boolean valueIsNull = convertedValue == null;
        Type.Repetition repetition = optionalOrRequired(converter.jsonSchema);
        if (valueIsNull && repetition.equals(OPTIONAL)) {
          continue;
        }
        r1.add(key, convertedValue);
      }
      return r1;
    }

    private Type buildSchema() {
      JsonArray inputSchema = this.jsonSchema.getDataTypeValues();
      List<Type> parquetTypes = new ArrayList<>();
      for (JsonElement element : inputSchema) {
        JsonObject map = (JsonObject) element;
        JsonSchema elementSchema = new JsonSchema(map);
        String columnName = elementSchema.getColumnName();
        JsonElementConverter converter = JsonElementConversionFactory.getConverter(elementSchema, false);
        Type schemaType = converter.schema();
        this.converters.put(columnName, converter);
        parquetTypes.add(schemaType);
      }
      String docName = this.jsonSchema.getColumnName();
      switch (recordType) {
        case ROOT:
          return new MessageType(docName, parquetTypes);
        case CHILD:
          return new GroupType(optionalOrRequired(this.jsonSchema), docName, parquetTypes);
        default:
          throw new RuntimeException("Unsupported Record type");
      }
    }

    @Override
    public Type schema() {
      return this.schema;
    }
  }

  public static class MapConverter extends CollectionConverter {

    public MapConverter(JsonSchema mapSchema) {
      super(mapSchema, mapSchema.getElementTypeUsingKey(MAP_ITEMS_KEY), false);
    }

    @Override
    Object convertField(JsonElement value) {
      ParquetGroup mapGroup = new ParquetGroup((GroupType) schema());
      JsonElementConverter converter = this.elementConverter;
      JsonObject map = (JsonObject) value;

      for (Map.Entry<String, JsonElement> entry : map.entrySet()) {
        ParquetGroup entrySet = (ParquetGroup) mapGroup.addGroup(MAP_KEY);
        entrySet.add(MAP_KEY_COLUMN_NAME, entry.getKey());
        entrySet.add(MAP_VALUE_COLUMN_NAME, converter.convert(entry.getValue()));
      }

      return mapGroup;
    }

    @Override
    protected Type buildSchema() {
      JsonElementConverter elementConverter = this.elementConverter;
      JsonElementConverter keyConverter = getKeyConverter();
      GroupType mapGroup =
          Types.repeatedGroup().addFields(keyConverter.schema(), elementConverter.schema()).named(MAP_KEY)
              .asGroupType();
      String columnName = this.jsonSchema.getColumnName();
      switch (optionalOrRequired(this.jsonSchema)) {
        case OPTIONAL:
          return Types.optionalGroup().addFields(mapGroup).named(columnName).asGroupType();
        case REQUIRED:
          return Types.requiredGroup().addFields(mapGroup).named(columnName).asGroupType();
        default:
          return null;
      }
    }

    @Override
    JsonSchema getElementSchema() {
      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType, false);
      jsonSchema.setColumnName(MAP_VALUE_COLUMN_NAME);
      return jsonSchema;
    }

    public JsonElementConverter getKeyConverter() {
      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING, false);
      jsonSchema.setColumnName(MAP_KEY_COLUMN_NAME);
      return getConverter(jsonSchema, false);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java [303:491]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        default:
          throw new RuntimeException("Unsupported Repetition type");
      }
    }
  }

  public static Type.Repetition optionalOrRequired(JsonSchema jsonBaseSchema) {
    return jsonBaseSchema.isNullable() ? OPTIONAL : REQUIRED;
  }

  public static class ArrayConverter extends CollectionConverter {

    public ArrayConverter(JsonSchema arraySchema) {
      super(arraySchema, arraySchema.getElementTypeUsingKey(ARRAY_ITEMS_KEY), true);
    }

    @Override
    Object convertField(JsonElement value) {
      ParquetGroup array = new ParquetGroup((GroupType) schema());
      JsonElementConverter converter = this.elementConverter;
      for (JsonElement elem : (JsonArray) value) {
        array.add(ARRAY_KEY, converter.convert(elem));
      }
      return array;
    }

    @Override
    protected Type buildSchema() {
      List<Type> fields = new ArrayList<>();
      fields.add(0, this.elementConverter.schema());
      return new GroupType(optionalOrRequired(jsonSchema), this.jsonSchema.getColumnName(), fields);
    }

    @Override
    JsonSchema getElementSchema() {
      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType, true);
      jsonSchema.setColumnName(ARRAY_KEY);
      return jsonSchema;
    }
  }

  public static class EnumConverter extends CollectionConverter {
    private final HashSet<String> symbols = new HashSet<>();

    public EnumConverter(JsonSchema enumSchema) {
      super(enumSchema, STRING, false);
      JsonArray symbolsArray = enumSchema.getSymbols();
      symbolsArray.forEach(e -> symbols.add(e.getAsString()));
    }

    @Override
    Object convertField(JsonElement value) {
      if (symbols.contains(value.getAsString()) || (this.jsonSchema.isNullable() && value.isJsonNull())) {
        return this.elementConverter.convert(value);
      }
      throw new RuntimeException("Symbol " + value.getAsString() + " does not belong to set " + symbols.toString());
    }

    @Override
    protected Type buildSchema() {
      return this.elementConverter.schema();
    }

    @Override
    JsonSchema getElementSchema() {
      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING, this.jsonSchema.isNullable());
      jsonSchema.setColumnName(this.jsonSchema.getColumnName());
      return jsonSchema;
    }
  }

  public static class RecordConverter extends JsonElementConverter {

    private final HashMap<String, JsonElementConverter> converters;
    private final RecordType recordType;
    private final Type schema;

    public enum RecordType {
      ROOT, CHILD
    }

    public RecordConverter(JsonSchema recordSchema) {
      this(recordSchema, CHILD);
    }

    public RecordConverter(JsonSchema recordSchema, RecordType recordType) {
      super(recordSchema);
      this.converters = new HashMap<>();
      this.recordType = recordType;
      this.schema = buildSchema();
    }

    @Override
    Object convertField(JsonElement value) {
      ParquetGroup r1 = new ParquetGroup((GroupType) schema());
      JsonObject inputRecord = value.getAsJsonObject();
      for (Map.Entry<String, JsonElement> entry : inputRecord.entrySet()) {
        String key = entry.getKey();
        JsonElementConverter converter = this.converters.get(key);
        Object convertedValue = converter.convert(entry.getValue());
        boolean valueIsNull = convertedValue == null;
        Type.Repetition repetition = optionalOrRequired(converter.jsonSchema);
        if (valueIsNull && repetition.equals(OPTIONAL)) {
          continue;
        }
        r1.add(key, convertedValue);
      }
      return r1;
    }

    private Type buildSchema() {
      JsonArray inputSchema = this.jsonSchema.getDataTypeValues();
      List<Type> parquetTypes = new ArrayList<>();
      for (JsonElement element : inputSchema) {
        JsonObject map = (JsonObject) element;
        JsonSchema elementSchema = new JsonSchema(map);
        String columnName = elementSchema.getColumnName();
        JsonElementConverter converter = JsonElementConversionFactory.getConverter(elementSchema, false);
        Type schemaType = converter.schema();
        this.converters.put(columnName, converter);
        parquetTypes.add(schemaType);
      }
      String docName = this.jsonSchema.getColumnName();
      switch (recordType) {
        case ROOT:
          return new MessageType(docName, parquetTypes);
        case CHILD:
          return new GroupType(optionalOrRequired(this.jsonSchema), docName, parquetTypes);
        default:
          throw new RuntimeException("Unsupported Record type");
      }
    }

    @Override
    public Type schema() {
      return this.schema;
    }
  }

  public static class MapConverter extends CollectionConverter {

    public MapConverter(JsonSchema mapSchema) {
      super(mapSchema, mapSchema.getElementTypeUsingKey(MAP_ITEMS_KEY), false);
    }

    @Override
    Object convertField(JsonElement value) {
      ParquetGroup mapGroup = new ParquetGroup((GroupType) schema());
      JsonElementConverter converter = this.elementConverter;
      JsonObject map = (JsonObject) value;

      for (Map.Entry<String, JsonElement> entry : map.entrySet()) {
        ParquetGroup entrySet = (ParquetGroup) mapGroup.addGroup(MAP_KEY);
        entrySet.add(MAP_KEY_COLUMN_NAME, entry.getKey());
        entrySet.add(MAP_VALUE_COLUMN_NAME, converter.convert(entry.getValue()));
      }

      return mapGroup;
    }

    @Override
    protected Type buildSchema() {
      JsonElementConverter elementConverter = this.elementConverter;
      JsonElementConverter keyConverter = getKeyConverter();
      GroupType mapGroup =
          Types.repeatedGroup().addFields(keyConverter.schema(), elementConverter.schema()).named(MAP_KEY)
              .asGroupType();
      String columnName = this.jsonSchema.getColumnName();
      switch (optionalOrRequired(this.jsonSchema)) {
        case OPTIONAL:
          return Types.optionalGroup().addFields(mapGroup).named(columnName).asGroupType();
        case REQUIRED:
          return Types.requiredGroup().addFields(mapGroup).named(columnName).asGroupType();
        default:
          return null;
      }
    }

    @Override
    JsonSchema getElementSchema() {
      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType, false);
      jsonSchema.setColumnName(MAP_VALUE_COLUMN_NAME);
      return jsonSchema;
    }

    public JsonElementConverter getKeyConverter() {
      JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING, false);
      jsonSchema.setColumnName(MAP_KEY_COLUMN_NAME);
      return getConverter(jsonSchema, false);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



