in gobblin-utility/src/main/java/org/apache/gobblin/util/AvroFlattener.java [343:431]
private List<Schema.Field> flattenField(Schema.Field f, ImmutableList<String> parentLineage,
boolean shouldPopulateLineage, boolean flattenComplexTypes, Optional<Schema> shouldWrapInOption) {
Preconditions.checkNotNull(f);
Preconditions.checkNotNull(f.schema());
Preconditions.checkNotNull(f.name());
List<Schema.Field> flattenedFields = new ArrayList<>();
ImmutableList<String> lineage = ImmutableList.<String>builder()
.addAll(parentLineage.iterator()).add(f.name()).build();
// If field.Type = RECORD, un-nest its fields and return them
if (Schema.Type.RECORD.equals(f.schema().getType())) {
if (null != f.schema().getFields() && f.schema().getFields().size() > 0) {
for (Schema.Field field : f.schema().getFields()) {
flattenedFields.addAll(flattenField(field, lineage, true, flattenComplexTypes, Optional.<Schema>absent()));
}
}
}
// If field.Type = OPTION, un-nest its fields and return them
else {
Optional<Schema> optionalRecord = isOfOptionType(f.schema());
if (optionalRecord.isPresent()) {
Schema record = optionalRecord.get();
if (record.getFields().size() > 0) {
for (Schema.Field field : record.getFields()) {
flattenedFields.addAll(flattenField(field, lineage, true, flattenComplexTypes, Optional.of(f.schema())));
}
}
}
// If field.Type = any-other, copy and return it
else {
// Compute name and source using lineage
String flattenName = f.name();
String flattenSource = StringUtils.EMPTY;
if (shouldPopulateLineage) {
flattenName = StringUtils.join(lineage, flattenedNameJoiner);
flattenSource = StringUtils.join(lineage, flattenedSourceJoiner);
}
// Copy field
Schema flattenedFieldSchema = flatten(f.schema(), shouldPopulateLineage, flattenComplexTypes);
if (shouldWrapInOption.isPresent()) {
boolean isNullFirstMember = Schema.Type.NULL.equals(shouldWrapInOption.get().getTypes().get(0).getType());
// If already Union, just copy it instead of wrapping (Union within Union is not supported)
if (Schema.Type.UNION.equals(flattenedFieldSchema.getType())) {
List<Schema> newUnionMembers = new ArrayList<>();
if (isNullFirstMember) {
newUnionMembers.add(Schema.create(Schema.Type.NULL));
}
for (Schema type : flattenedFieldSchema.getTypes()) {
if (Schema.Type.NULL.equals(type.getType())) {
continue;
}
newUnionMembers.add(type);
}
if (!isNullFirstMember) {
newUnionMembers.add(Schema.create(Schema.Type.NULL));
}
flattenedFieldSchema = Schema.createUnion(newUnionMembers);
}
// Wrap the Union, since parent Union is an option
else {
// If the field within the parent Union has a non-null default value, then null should not be the first member
if (f.hasDefaultValue() && f.defaultVal() != null) {
isNullFirstMember = false;
}
if (isNullFirstMember) {
flattenedFieldSchema =
Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), flattenedFieldSchema));
} else {
flattenedFieldSchema =
Schema.createUnion(Arrays.asList(flattenedFieldSchema, Schema.create(Schema.Type.NULL)));
}
}
}
Schema.Field field = AvroCompatibilityHelper.createSchemaField(flattenName, flattenedFieldSchema, f.doc(),
AvroUtils.getCompatibleDefaultValue(f), f.order());
if (StringUtils.isNotBlank(flattenSource)) {
field.addProp(FLATTENED_SOURCE_KEY, flattenSource);
}
// Avro 1.9 compatible change - replaced deprecated public api getJsonProps with AvroCompatibilityHelper methods
AvroSchemaUtils.copyFieldProperties(f, field);
flattenedFields.add(field);
}
}
return flattenedFields;
}