in gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java [1033:1115]
private static Schema dropRecursive(SchemaEntry schemaEntry, List<SchemaEntry> parents, List<SchemaEntry> fieldsWithRecursion) {
Schema schema = schemaEntry.schema;
// ignore primitive fields
switch (schema.getType()) {
case UNION:{
List<Schema> unionTypes = schema.getTypes();
List<Schema> copiedUnionTypes = new ArrayList<Schema>();
for (Schema unionSchema: unionTypes) {
SchemaEntry unionSchemaEntry = new SchemaEntry(
schemaEntry.fieldName, unionSchema);
copiedUnionTypes.add(dropRecursive(unionSchemaEntry, parents, fieldsWithRecursion));
}
if (copiedUnionTypes.stream().anyMatch(x -> x == null)) {
// one or more types in the union are referring to a parent type (directly recursive),
// entire union must be dropped
return null;
}
else {
Schema copySchema = Schema.createUnion(copiedUnionTypes);
copyProperties(schema, copySchema);
return copySchema;
}
}
case RECORD:{
// check if the type of this schema matches any in the parents list
if (parents.stream().anyMatch(parent -> parent.fullyQualifiedType().equals(schemaEntry.fullyQualifiedType()))) {
fieldsWithRecursion.add(schemaEntry);
return null;
}
List<SchemaEntry> newParents = new ArrayList<>(parents);
newParents.add(schemaEntry);
List<Schema.Field> copiedSchemaFields = new ArrayList<>();
for (Schema.Field field: schema.getFields()) {
String fieldName = schemaEntry.fieldName != null ? schemaEntry.fieldName + "." + field.name() : field.name();
SchemaEntry fieldSchemaEntry = new SchemaEntry(fieldName, field.schema());
Schema copiedFieldSchema = dropRecursive(fieldSchemaEntry, newParents, fieldsWithRecursion);
if (copiedFieldSchema == null) {
} else {
Schema.Field copiedField = AvroCompatibilityHelper.createSchemaField(field.name(), copiedFieldSchema,
field.doc(), getCompatibleDefaultValue(field), field.order());
copyFieldProperties(field, copiedField);
copiedSchemaFields.add(copiedField);
}
}
if (copiedSchemaFields.size() > 0) {
Schema copiedRecord = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(),
schema.isError());
copiedRecord.setFields(copiedSchemaFields);
copyProperties(schema, copiedRecord);
return copiedRecord;
} else {
return null;
}
}
case ARRAY: {
Schema itemSchema = schema.getElementType();
SchemaEntry itemSchemaEntry = new SchemaEntry(schemaEntry.fieldName, itemSchema);
Schema copiedItemSchema = dropRecursive(itemSchemaEntry, parents, fieldsWithRecursion);
if (copiedItemSchema == null) {
return null;
} else {
Schema copiedArraySchema = Schema.createArray(copiedItemSchema);
copyProperties(schema, copiedArraySchema);
return copiedArraySchema;
}
}
case MAP: {
Schema valueSchema = schema.getValueType();
SchemaEntry valueSchemaEntry = new SchemaEntry(schemaEntry.fieldName, valueSchema);
Schema copiedValueSchema = dropRecursive(valueSchemaEntry, parents, fieldsWithRecursion);
if (copiedValueSchema == null) {
return null;
} else {
Schema copiedMapSchema = Schema.createMap(copiedValueSchema);
copyProperties(schema, copiedMapSchema);
return copiedMapSchema;
}
}
default: {
return schema;
}
}
}