private List flattenField()

in gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java [343:431]


  private List<Schema.Field> flattenField(Schema.Field f, ImmutableList<String> parentLineage,
      boolean shouldPopulateLineage, boolean flattenComplexTypes, Optional<Schema> shouldWrapInOption) {
    Preconditions.checkNotNull(f);
    Preconditions.checkNotNull(f.schema());
    Preconditions.checkNotNull(f.name());

    List<Schema.Field> flattenedFields = new ArrayList<>();
    ImmutableList<String> lineage = ImmutableList.<String>builder()
        .addAll(parentLineage.iterator()).add(f.name()).build();

    // If field.Type = RECORD, un-nest its fields and return them
    if (Schema.Type.RECORD.equals(f.schema().getType())) {
      if (null != f.schema().getFields() && f.schema().getFields().size() > 0) {
        for (Schema.Field field : f.schema().getFields()) {
          flattenedFields.addAll(flattenField(field, lineage, true, flattenComplexTypes, Optional.<Schema>absent()));
        }
      }
    }
    // If field.Type = OPTION, un-nest its fields and return them
    else {
      Optional<Schema> optionalRecord = isOfOptionType(f.schema());
      if (optionalRecord.isPresent()) {
        Schema record = optionalRecord.get();
        if (record.getFields().size() > 0) {
          for (Schema.Field field : record.getFields()) {
            flattenedFields.addAll(flattenField(field, lineage, true, flattenComplexTypes, Optional.of(f.schema())));
          }
        }
      }
      // If field.Type = any-other, copy and return it
      else {
        // Compute name and source using lineage
        String flattenName = f.name();
        String flattenSource = StringUtils.EMPTY;
        if (shouldPopulateLineage) {
          flattenName = StringUtils.join(lineage, flattenedNameJoiner);
          flattenSource = StringUtils.join(lineage, flattenedSourceJoiner);
        }
        // Copy field
        Schema flattenedFieldSchema = flatten(f.schema(), shouldPopulateLineage, flattenComplexTypes);
        if (shouldWrapInOption.isPresent()) {
          boolean isNullFirstMember = Schema.Type.NULL.equals(shouldWrapInOption.get().getTypes().get(0).getType());
          // If already Union, just copy it instead of wrapping (Union within Union is not supported)
          if (Schema.Type.UNION.equals(flattenedFieldSchema.getType())) {
            List<Schema> newUnionMembers = new ArrayList<>();
            if (isNullFirstMember) {
              newUnionMembers.add(Schema.create(Schema.Type.NULL));
            }
            for (Schema type : flattenedFieldSchema.getTypes()) {
              if (Schema.Type.NULL.equals(type.getType())) {
                continue;
              }
              newUnionMembers.add(type);
            }
            if (!isNullFirstMember) {
              newUnionMembers.add(Schema.create(Schema.Type.NULL));
            }

            flattenedFieldSchema = Schema.createUnion(newUnionMembers);
          }
          // Wrap the Union, since parent Union is an option
          else {
            // If the field within the parent Union has a non-null default value, then null should not be the first member
            if (f.hasDefaultValue() && f.defaultVal() != null) {
              isNullFirstMember = false;
            }
            if (isNullFirstMember) {
              flattenedFieldSchema =
                  Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), flattenedFieldSchema));
            } else {
              flattenedFieldSchema =
                  Schema.createUnion(Arrays.asList(flattenedFieldSchema, Schema.create(Schema.Type.NULL)));
            }
          }
        }
        Schema.Field field = AvroCompatibilityHelper.createSchemaField(flattenName, flattenedFieldSchema, f.doc(),
            AvroUtils.getCompatibleDefaultValue(f), f.order());

        if (StringUtils.isNotBlank(flattenSource)) {
          field.addProp(FLATTENED_SOURCE_KEY, flattenSource);
        }
        // Avro 1.9 compatible change - replaced deprecated public api getJsonProps with AvroCompatibilityHelper methods
        AvroSchemaUtils.copyFieldProperties(f, field);
        flattenedFields.add(field);
      }
    }

    return flattenedFields;
  }