in src/main/java/com/amazonaws/schemamanager/registry/SchemaRegistryClient.java [179:214]
public void deployChanges(List<RepoSchema> repoSchemas, List<Subject> registrySubjects) {
List<String> deployErrors = new LinkedList<>();
final Map<String, List<RepoSchema>> repoSubjects = new LinkedHashMap<>();
repoSchemas.forEach(s -> {
RepoUtils.schemaToSubjectMap(s, repoSubjects);
});
Map<String, Subject> regSubjectsMap = new HashMap<>();
registrySubjects.forEach(regSubj -> {
regSubjectsMap.put(regSubj.getName(), regSubj);
});
repoSubjects.forEach((s, l) -> {
if (l.size() > 1) {
deployErrors.add(String.format("More ambigous schemas (%d) defined for a subject %s. Schema changes will be ignored.", l.size(), s));
return;
}
});
repoSubjects.entrySet().stream()
.filter(x -> x.getValue() != null && x.getValue().size() == 1)
.forEach(x -> {
String s = x.getKey();
// this means we already processed this subject/schema as part of possible dependency/reference
if (resolvedVersions.containsKey(s)) return;
List<String> errors = registerSubject(s, repoSubjects, resolvedVersions, regSubjectsMap);
if (errors != null) {
deployErrors.addAll(errors);
}
});
deployErrors.forEach(e -> System.out.println(e));
}