in src/main/java/com/amazonaws/schemamanager/registry/SchemaRegistryClient.java [225:310]
private List<String> registerSubject(String subjectName,
Map<String, List<RepoSchema>> repoSubjects,
Map<String, Subject> resolvedVersions,
Map<String, Subject> registrySubjects) {
List<String> allErrors = new LinkedList<>();
if (resolvedVersions.containsKey(subjectName)) return allErrors;
RepoSchema repoSchema = repoSubjects.get(subjectName).get(0);
List<String> dependencies = getDependencies(repoSchema, repoSubjects, resolvedVersions, registrySubjects);
// first, register all dependencies, so they will be in the registry
for (String dependency : dependencies) {
List<String> errors = registerSubject(dependency, repoSubjects, resolvedVersions, registrySubjects);
allErrors.addAll(errors);
updateDependency(repoSchema, dependency, registrySubjects);
}
Subject regSubj = resolvedVersions.get(subjectName);
if (regSubj == null) {
regSubj = registrySubjects.get(subjectName);
}
// update compatibility, before registering new version of schema:
if (regSubj != null && !regSubj.getCompatibility().equals(repoSchema.getMetadata().getCompatibilityLevel(subjectName))) {
try {
cSchemaRegistryclient.updateCompatibility(subjectName, repoSchema.getMetadata().getCompatibilityLevel(subjectName).toString());
} catch (Exception e) {
allErrors.add(
String.format("Couldn't update compatibility level %s for subject %s. Exception: %s",
repoSchema.getMetadata().getCompatibilityLevel(subjectName).toString(),
subjectName,
e.getMessage()));
}
// get fresh subject from registry
try {
regSubj = getSubject(subjectName);
// update in cache
resolvedVersions.put(subjectName, regSubj);
} catch (Exception e) {
allErrors.add(String.format("Unexpected exception while fetching of %s. Exception: %s", subjectName, e.getMessage()));
}
}
// check if new version needs to be registered:
if (regSubj != null) {
ParsedSchema regSubjSchema = regSubj.getLatestSchema();
if ( repoSchema.getSchema().equals(regSubjSchema)) {
resolvedVersions.put(subjectName, regSubj);
return allErrors;
}
}
// test compatibility against the level set in repository, because it will be updated in registry
List<String> compErrors = regSubj == null
? null
: repoSchema.getSchema().isCompatible(
repoSchema.getMetadata().getCompatibilityLevel(subjectName),
regSubj.getSchemas());
if (compErrors == null || compErrors.isEmpty()) {
if ( regSubj != null && regSubj.getLatestSchema().equals(repoSchema.getSchema())) {
// schema already exists and up to date
return allErrors;
}else {
//newVersionFound.put(s, new SchemaPair(repoSchema, regSubj));
try {
cSchemaRegistryclient.register(subjectName, repoSchema.getSchema());
} catch (Exception e) {
allErrors.add(String.format("Unexpected exception during registration of %s. Exception: %s", subjectName, e.getMessage()));
}
// get fresh subject from registry
try {
regSubj = getSubject(subjectName);
// update in cache
resolvedVersions.put(subjectName, regSubj);
} catch (Exception e) {
allErrors.add(String.format("Unexpected exception while fetching %s. Exception: %s", subjectName, e.getMessage()));
}
}
}else {
allErrors.addAll(compErrors);
return allErrors;
}
return allErrors;
}