in config/src/main/java/org/apache/karaf/cellar/config/ConfigurationSynchronizer.java [185:255]
public void push(Group group) {
if (eventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) {
LOGGER.warn("CELLAR CONFIG: cluster event producer is OFF");
return;
}
if (group != null) {
String groupName = group.getName();
LOGGER.debug("CELLAR CONFIG: pushing configurations to cluster group {}", groupName);
Map<String, Properties> clusterConfigurations = clusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + groupName);
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
Configuration[] localConfigurations;
try {
localConfigurations = configurationAdmin.listConfigurations(null);
// push local configurations to the cluster
for (Configuration localConfiguration : localConfigurations) {
String pid = localConfiguration.getPid();
// check if the pid is marked as local.
if (isAllowed(group, Constants.CATEGORY, pid, EventType.OUTBOUND)) {
Dictionary localDictionary = localConfiguration.getProperties();
localDictionary = filter(localDictionary);
if (!clusterConfigurations.containsKey(pid)) {
LOGGER.debug("CELLAR CONFIG: creating configuration pid {} on the cluster", pid);
// update cluster configurations
clusterConfigurations.put(pid, dictionaryToProperties(localDictionary));
// send cluster event
ClusterConfigurationEvent event = new ClusterConfigurationEvent(pid);
event.setSourceGroup(group);
event.setSourceNode(clusterManager.getNode());
event.setLocal(clusterManager.getNode());
eventProducer.produce(event);
} else {
Dictionary clusterDictionary = clusterConfigurations.get(pid);
if (!equals(clusterDictionary, localDictionary) && canDistributeConfig(localDictionary)) {
LOGGER.debug("CELLAR CONFIG: updating configuration pid {} on the cluster", pid);
// update cluster configurations
clusterConfigurations.put(pid, dictionaryToProperties(localDictionary));
// send cluster event
ClusterConfigurationEvent event = new ClusterConfigurationEvent(pid);
event.setSourceGroup(group);
event.setLocal(clusterManager.getNode());
event.setSourceNode(clusterManager.getNode());
eventProducer.produce(event);
}
}
} else
LOGGER.trace("CELLAR CONFIG: configuration with PID {} is marked BLOCKED OUTBOUND for cluster group {}", pid, groupName);
}
// clean configurations on the cluster not present locally
for (String pid : clusterConfigurations.keySet()) {
if (isAllowed(group, Constants.CATEGORY, pid, EventType.OUTBOUND)) {
if (findLocalConfiguration(pid,clusterConfigurations.get(pid)) == null) {
clusterConfigurations.remove(pid);
}
}
}
getSynchronizerMap().putIfAbsent(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + groupName, true);
} catch (IOException ex) {
LOGGER.error("CELLAR CONFIG: failed to read configuration (IO error)", ex);
} catch (InvalidSyntaxException ex) {
LOGGER.error("CELLAR CONFIG: failed to read configuration (invalid filter syntax)", ex);
}
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
}
}