private void deployChanges()

in src/main/java/com/amazonaws/schemamanager/SchemaManagerTask.java [65:109]


	private void deployChanges(AppConfig config, Map<String, RepoSchema> newSubjectsInRepo, Map<String, SchemaPair> newVersionFound,
			Map<String, SchemaPair> updateCompatibility) {
		
		// First, let's update compatibility:
		CachedSchemaRegistryClient sr = registryClient.getSchemaRegistryClient();
		if (updateCompatibility != null) {
			updateCompatibility.forEach((s, scp) -> {
				try {
					sr.updateCompatibility(s, scp.getRepoSchema().getMetadata().getCompatibilityLevel(s).toString());
				} catch (Exception e) {
					log.error("Couldn't update compatibility level " + scp.getRepoSchema().getMetadata().getCompatibilityLevel(s).toString() + " for subject " + s, e);
				}
			});
		}
		
		
		// Let's deploy updates in existing schemas:
		if (newVersionFound != null) {
			newVersionFound.forEach((s, scp) -> {
				try {
					sr.register(s, scp.getRepoSchema().getSchema());
				} catch (IOException | RestClientException e) {
					log.error("Couldn't register subject " + s + "from repository schema at " + scp.getRepoSchema().getPath(), e);
				}
			});
		}
		
		// Let's register brand new schemas:
		if (newSubjectsInRepo != null) {
			newSubjectsInRepo.forEach((s, r) -> {
				if (s == null) return;
				try {
					sr.register(s, r.getSchema());
				} catch (Exception e) {
					log.error("Couldn't register subject " + s + "from repository schema at " + r.getPath(), e);
				}
				try {
					sr.updateCompatibility(s, r.getMetadata().getCompatibilityLevel(s).toString());
				} catch (Exception e) {
					log.error("Couldn't update compatibility level " + r.getMetadata().getCompatibilityLevel(s).toString() + " for subject " + s, e);
				}
			});
		}
		
	}