private SchemaCompatibilityResult calculateCompatibility()

in hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java [329:450]


    private SchemaCompatibilityResult calculateCompatibility(final Schema reader, final Schema writer,
                                                             final Deque<LocationInfo> locations) {
      SchemaCompatibilityResult result = SchemaCompatibilityResult.compatible();

      if (reader.getType() == writer.getType()) {
        switch (reader.getType()) {
          case NULL:
          case BOOLEAN:
          case INT:
          case LONG:
          case FLOAT:
          case DOUBLE:
          case BYTES:
          case STRING: {
            return result;
          }
          case ARRAY: {
            return result.mergedWith(getCompatibility(reader.getElementType(), writer.getElementType(), locations));
          }
          case MAP: {
            return result.mergedWith(getCompatibility(reader.getValueType(), writer.getValueType(), locations));
          }
          case FIXED: {
            result = result.mergedWith(checkSchemaNames(reader, writer, locations));
            return result.mergedWith(checkFixedSize(reader, writer, locations));
          }
          case ENUM: {
            result = result.mergedWith(checkSchemaNames(reader, writer, locations));
            return result.mergedWith(checkReaderEnumContainsAllWriterEnumSymbols(reader, writer, locations));
          }
          case RECORD: {
            result = result.mergedWith(checkSchemaNames(reader, writer, locations));
            return result.mergedWith(checkReaderWriterRecordFields(reader, writer, locations));
          }
          case UNION: {
            // Check that each individual branch of the writer union can be decoded:
            for (final Schema writerBranch : writer.getTypes()) {
              SchemaCompatibilityResult compatibility = getCompatibility(reader, writerBranch, locations);
              if (compatibility.getCompatibility() == SchemaCompatibilityType.INCOMPATIBLE) {
                String message = String.format("reader union lacking writer type: %s for field: '%s'", writerBranch.getType(), getLocationName(locations, reader.getType()));
                result = result.mergedWith(SchemaCompatibilityResult.incompatible(
                    SchemaIncompatibilityType.MISSING_UNION_BRANCH, reader, writer, message, asList(locations)));
              }
            }
            // Each schema in the writer union can be decoded with the reader:
            return result;
          }

          default: {
            throw new AvroRuntimeException("Unknown schema type: " + reader.getType());
          }
        }

      } else {
        // Reader and writer have different schema types:

        // Reader compatible with all branches of a writer union is compatible
        if (writer.getType() == Schema.Type.UNION) {
          for (Schema s : writer.getTypes()) {
            result = result.mergedWith(getCompatibility(reader, s, locations));
          }
          return result;
        }

        switch (reader.getType()) {
          case NULL:
            return result.mergedWith(typeMismatch(reader, writer, locations));
          case BOOLEAN:
            return result.mergedWith(typeMismatch(reader, writer, locations));
          case INT:
            return result.mergedWith(typeMismatch(reader, writer, locations));
          case LONG: {
            return (writer.getType() == Type.INT) ? result : result.mergedWith(typeMismatch(reader, writer, locations));
          }
          case FLOAT: {
            return ((writer.getType() == Type.INT) || (writer.getType() == Type.LONG)) ? result
                : result.mergedWith(typeMismatch(reader, writer, locations));

          }
          case DOUBLE: {
            return ((writer.getType() == Type.INT) || (writer.getType() == Type.LONG) || (writer.getType() == Type.FLOAT))
                ? result
                : result.mergedWith(typeMismatch(reader, writer, locations));
          }
          case BYTES: {
            return (writer.getType() == Type.STRING) ? result : result.mergedWith(typeMismatch(reader, writer, locations));
          }
          case STRING: {
            return (isTypeNumeric(writer.getType()) || (writer.getType() == Schema.Type.BYTES)
                ? result : result.mergedWith(typeMismatch(reader, writer, locations)));
          }

          case ARRAY:
            return result.mergedWith(typeMismatch(reader, writer, locations));
          case MAP:
            return result.mergedWith(typeMismatch(reader, writer, locations));
          case FIXED:
            return result.mergedWith(typeMismatch(reader, writer, locations));
          case ENUM:
            return result.mergedWith(typeMismatch(reader, writer, locations));
          case RECORD:
            return result.mergedWith(typeMismatch(reader, writer, locations));
          case UNION: {
            for (final Schema readerBranch : reader.getTypes()) {
              SchemaCompatibilityResult compatibility = getCompatibility(readerBranch, writer, locations);
              if (compatibility.getCompatibility() == SchemaCompatibilityType.COMPATIBLE) {
                return result;
              }
            }
            // No branch in the reader union has been found compatible with the writer
            // schema:
            String message = String.format("reader union lacking writer type: %s for field: '%s'", writer.getType(), getLocationName(locations, reader.getType()));
            return result.mergedWith(SchemaCompatibilityResult
                .incompatible(SchemaIncompatibilityType.MISSING_UNION_BRANCH, reader, writer, message, asList(locations)));
          }

          default: {
            throw new AvroRuntimeException("Unknown schema type: " + reader.getType());
          }
        }
      }
    }