in src/main/java/com/amazonaws/schemamanager/analyze/SchemaAnalyzerImpl.java [224:324]
public CheckChangesResponse checkChanges(CheckChangesRequest request) {
List<RepoSchema> repoSchemas = request.getRepoSchemas();
List<Subject> subjects = request.getSubjects();
// repo schemas by subject name
final Map<String, List<RepoSchema>> repoSubjects = new LinkedHashMap<>();
repoSchemas.forEach(s -> {
RepoUtils.schemaToSubjectMap(s, repoSubjects);
});
Map<String, Subject> regSubjectsMap = new HashMap<>();
subjects.forEach(regSubj -> {
regSubjectsMap.put(regSubj.getName(), regSubj);
});
Map<String, RepoSchema> newSubjectsInRepo = new LinkedHashMap<>();
Map<String, SchemaPair> newVersionFound = new LinkedHashMap<>();
List<SchemaPair> unchangedSchemas = new LinkedList<>();
// Map<String, CompatibilityLevel> updateCompatibility = new HashMap<>();
Map<String, SchemaPair> updateCompatibility = new LinkedHashMap<>();
Map<String, List<RepoSchema>> ambiguousSubjects = new LinkedHashMap<>();
Map<String, CompatibilityError> compatibilityErrors = new LinkedHashMap<>();
List<Subject> notInRepositorySubjects = new LinkedList<>();
repoSubjects.forEach((s, l) -> {
if (l.size() > 1) {
for (RepoSchema r : l) {
if (r == l.get(0)) continue;
if (!r.getSchema().equals(l.get(0).getSchema())) {
ambiguousSubjects.computeIfAbsent(s, n -> new LinkedList<RepoSchema>(Arrays.asList(l.get(0)))).add(r);
}
}
if (ambiguousSubjects.get(s) != null && ambiguousSubjects.get(s).size() > 1) {
// at this point, the subject is not uniquely defined and cannot be represented by the repository. Fix the repo first!
return;
}
}
Subject regSubj = regSubjectsMap.get(s);
RepoSchema repoSchema = l.get(0);
if (regSubj == null) {
// new subject found!!
newSubjectsInRepo.put(s, repoSchema);
return;
}
if (!regSubj.getCompatibility().equals(repoSchema.getMetadata().getCompatibilityLevel(s))) {
// updateCompatibility.put(s, repoSchema.getMetadata().getCompatibilityLevel(s));
updateCompatibility.put(s, new SchemaPair(repoSchema, regSubj));
}
// by this time, we have a single schema in the list of all of them are the same, so it should be safe to take the first one.
ParsedSchema regSubjSchema = regSubj.getLatestSchema();
if ( repoSchema.getSchema().equals(regSubjSchema)) {
unchangedSchemas.add(new SchemaPair(repoSchema, regSubj));
// if schemas are the same, there is nothing to update in registry.
return;
}
// test compatibility against the level set in repository, because it will be updated in registry
List<String> compErrors = repoSchema.getSchema().isCompatible(
repoSchema.getMetadata().getCompatibilityLevel(s),
regSubj.getSchemas());
if (compErrors == null || compErrors.isEmpty()) {
if (RepoUtils.equalsIgnoreRefs(repoSchema.getSchema(), regSubj.getLatestSchema())) {
// schema already exists and up to date
return;
}else {
newVersionFound.put(s, new SchemaPair(repoSchema, regSubj));
}
}else {
compatibilityErrors.put(s, new CompatibilityError(s, repoSchema, compErrors));
}
});
regSubjectsMap.forEach((sn, s) ->{
if (!repoSubjects.containsKey(sn)) {
notInRepositorySubjects.add(s);
}
});
CheckChangesResponse response = new CheckChangesResponse(
newSubjectsInRepo,
newVersionFound,
unchangedSchemas,
updateCompatibility,
ambiguousSubjects,
compatibilityErrors,
notInRepositorySubjects
);
return response;
}