in src/main/java/com/amazonaws/schemamanager/SchemaManagerTask.java [65:109]
private void deployChanges(AppConfig config, Map<String, RepoSchema> newSubjectsInRepo, Map<String, SchemaPair> newVersionFound,
Map<String, SchemaPair> updateCompatibility) {
// First, let's update compatibility:
CachedSchemaRegistryClient sr = registryClient.getSchemaRegistryClient();
if (updateCompatibility != null) {
updateCompatibility.forEach((s, scp) -> {
try {
sr.updateCompatibility(s, scp.getRepoSchema().getMetadata().getCompatibilityLevel(s).toString());
} catch (Exception e) {
log.error("Couldn't update compatibility level " + scp.getRepoSchema().getMetadata().getCompatibilityLevel(s).toString() + " for subject " + s, e);
}
});
}
// Let's deploy updates in existing schemas:
if (newVersionFound != null) {
newVersionFound.forEach((s, scp) -> {
try {
sr.register(s, scp.getRepoSchema().getSchema());
} catch (IOException | RestClientException e) {
log.error("Couldn't register subject " + s + "from repository schema at " + scp.getRepoSchema().getPath(), e);
}
});
}
// Let's register brand new schemas:
if (newSubjectsInRepo != null) {
newSubjectsInRepo.forEach((s, r) -> {
if (s == null) return;
try {
sr.register(s, r.getSchema());
} catch (Exception e) {
log.error("Couldn't register subject " + s + "from repository schema at " + r.getPath(), e);
}
try {
sr.updateCompatibility(s, r.getMetadata().getCompatibilityLevel(s).toString());
} catch (Exception e) {
log.error("Couldn't update compatibility level " + r.getMetadata().getCompatibilityLevel(s).toString() + " for subject " + s, e);
}
});
}
}