in config/src/main/java/org/apache/karaf/cellar/config/LocalConfigurationListener.java [45:117]
public void configurationEvent(ConfigurationEvent event) {
if (!isEnabled()) {
LOGGER.trace("CELLAR CONFIG: local listener is disabled");
return;
}
// check if the producer is ON
if (eventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) {
LOGGER.debug("CELLAR CONFIG: cluster event producer is OFF");
return;
}
String pid = event.getPid();
Set<Group> groups = groupManager.listLocalGroups();
if (groups != null && !groups.isEmpty()) {
for (Group group : groups) {
// check if the pid is allowed for outbound.
if (isAllowed(group, Constants.CATEGORY, pid, EventType.OUTBOUND)) {
Map<String, Properties> clusterConfigurations = clusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + group.getName());
try {
if (event.getType() == ConfigurationEvent.CM_DELETED) {
if (clusterConfigurations.containsKey(pid)) {
String filename = (String) clusterConfigurations.get(pid).get(KARAF_CELLAR_FILENAME);
List<String> matchingPids = new ArrayList<String>();
for (Map.Entry<String, Properties> entry : clusterConfigurations.entrySet()) {
if (filename.equals(entry.getValue().get(KARAF_CELLAR_FILENAME))) {
matchingPids.add(entry.getKey());
}
}
for (String matchingPid : matchingPids) {
// update the configurations in the cluster group
clusterConfigurations.put(matchingPid, getDeletedConfigurationMarker(clusterConfigurations.get(matchingPid)));
}
// send the cluster event
ClusterConfigurationEvent clusterConfigurationEvent = new ClusterConfigurationEvent(pid);
clusterConfigurationEvent.setType(event.getType());
clusterConfigurationEvent.setSourceNode(clusterManager.getNode());
clusterConfigurationEvent.setSourceGroup(group);
clusterConfigurationEvent.setLocal(clusterManager.getNode());
eventProducer.produce(clusterConfigurationEvent);
}
} else {
Configuration conf = configurationAdmin.getConfiguration(pid, null);
Dictionary localDictionary = conf.getProperties();
localDictionary = filter(localDictionary);
Properties distributedDictionary = clusterConfigurations.get(pid);
if (!equals(localDictionary, distributedDictionary) && canDistributeConfig(localDictionary)) {
// update the configurations in the cluster group
clusterConfigurations.put(pid, dictionaryToProperties(localDictionary));
// send the cluster event
ClusterConfigurationEvent clusterConfigurationEvent = new ClusterConfigurationEvent(pid);
clusterConfigurationEvent.setSourceGroup(group);
clusterConfigurationEvent.setSourceNode(clusterManager.getNode());
clusterConfigurationEvent.setLocal(clusterManager.getNode());
eventProducer.produce(clusterConfigurationEvent);
}
}
} catch (Exception e) {
LOGGER.error("CELLAR CONFIG: failed to update configuration with PID {} in the cluster group {}", pid, group.getName(), e);
}
} else LOGGER.trace("CELLAR CONFIG: configuration with PID {} is marked BLOCKED OUTBOUND for cluster group {}", pid, group.getName());
}
}
}