in hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java [329:450]
private SchemaCompatibilityResult calculateCompatibility(final Schema reader, final Schema writer,
final Deque<LocationInfo> locations) {
SchemaCompatibilityResult result = SchemaCompatibilityResult.compatible();
if (reader.getType() == writer.getType()) {
switch (reader.getType()) {
case NULL:
case BOOLEAN:
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case BYTES:
case STRING: {
return result;
}
case ARRAY: {
return result.mergedWith(getCompatibility(reader.getElementType(), writer.getElementType(), locations));
}
case MAP: {
return result.mergedWith(getCompatibility(reader.getValueType(), writer.getValueType(), locations));
}
case FIXED: {
result = result.mergedWith(checkSchemaNames(reader, writer, locations));
return result.mergedWith(checkFixedSize(reader, writer, locations));
}
case ENUM: {
result = result.mergedWith(checkSchemaNames(reader, writer, locations));
return result.mergedWith(checkReaderEnumContainsAllWriterEnumSymbols(reader, writer, locations));
}
case RECORD: {
result = result.mergedWith(checkSchemaNames(reader, writer, locations));
return result.mergedWith(checkReaderWriterRecordFields(reader, writer, locations));
}
case UNION: {
// Check that each individual branch of the writer union can be decoded:
for (final Schema writerBranch : writer.getTypes()) {
SchemaCompatibilityResult compatibility = getCompatibility(reader, writerBranch, locations);
if (compatibility.getCompatibility() == SchemaCompatibilityType.INCOMPATIBLE) {
String message = String.format("reader union lacking writer type: %s for field: '%s'", writerBranch.getType(), getLocationName(locations, reader.getType()));
result = result.mergedWith(SchemaCompatibilityResult.incompatible(
SchemaIncompatibilityType.MISSING_UNION_BRANCH, reader, writer, message, asList(locations)));
}
}
// Each schema in the writer union can be decoded with the reader:
return result;
}
default: {
throw new AvroRuntimeException("Unknown schema type: " + reader.getType());
}
}
} else {
// Reader and writer have different schema types:
// Reader compatible with all branches of a writer union is compatible
if (writer.getType() == Schema.Type.UNION) {
for (Schema s : writer.getTypes()) {
result = result.mergedWith(getCompatibility(reader, s, locations));
}
return result;
}
switch (reader.getType()) {
case NULL:
return result.mergedWith(typeMismatch(reader, writer, locations));
case BOOLEAN:
return result.mergedWith(typeMismatch(reader, writer, locations));
case INT:
return result.mergedWith(typeMismatch(reader, writer, locations));
case LONG: {
return (writer.getType() == Type.INT) ? result : result.mergedWith(typeMismatch(reader, writer, locations));
}
case FLOAT: {
return ((writer.getType() == Type.INT) || (writer.getType() == Type.LONG)) ? result
: result.mergedWith(typeMismatch(reader, writer, locations));
}
case DOUBLE: {
return ((writer.getType() == Type.INT) || (writer.getType() == Type.LONG) || (writer.getType() == Type.FLOAT))
? result
: result.mergedWith(typeMismatch(reader, writer, locations));
}
case BYTES: {
return (writer.getType() == Type.STRING) ? result : result.mergedWith(typeMismatch(reader, writer, locations));
}
case STRING: {
return (isTypeNumeric(writer.getType()) || (writer.getType() == Schema.Type.BYTES)
? result : result.mergedWith(typeMismatch(reader, writer, locations)));
}
case ARRAY:
return result.mergedWith(typeMismatch(reader, writer, locations));
case MAP:
return result.mergedWith(typeMismatch(reader, writer, locations));
case FIXED:
return result.mergedWith(typeMismatch(reader, writer, locations));
case ENUM:
return result.mergedWith(typeMismatch(reader, writer, locations));
case RECORD:
return result.mergedWith(typeMismatch(reader, writer, locations));
case UNION: {
for (final Schema readerBranch : reader.getTypes()) {
SchemaCompatibilityResult compatibility = getCompatibility(readerBranch, writer, locations);
if (compatibility.getCompatibility() == SchemaCompatibilityType.COMPATIBLE) {
return result;
}
}
// No branch in the reader union has been found compatible with the writer
// schema:
String message = String.format("reader union lacking writer type: %s for field: '%s'", writer.getType(), getLocationName(locations, reader.getType()));
return result.mergedWith(SchemaCompatibilityResult
.incompatible(SchemaIncompatibilityType.MISSING_UNION_BRANCH, reader, writer, message, asList(locations)));
}
default: {
throw new AvroRuntimeException("Unknown schema type: " + reader.getType());
}
}
}
}