in services/src/main/java/org/apache/unomi/services/impl/cluster/ClusterServiceImpl.java [112:174]
public void init() {
if (karafCellarEventProducer != null && karafCellarClusterManager != null) {
boolean setupConfigOk = true;
group = karafCellarGroupManager.findGroupByName(karafCellarGroupName);
if (setupConfigOk && group == null) {
LOGGER.error("Cluster group {} doesn't exist, creating it...", karafCellarGroupName);
group = karafCellarGroupManager.createGroup(karafCellarGroupName);
if (group != null) {
setupConfigOk = true;
} else {
setupConfigOk = false;
}
}
// check if the producer is ON
if (setupConfigOk && karafCellarEventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) {
LOGGER.error("Cluster event producer is OFF");
setupConfigOk = false;
}
// check if the config pid is allowed
if (setupConfigOk && !isClusterConfigPIDAllowed(group, Constants.CATEGORY, KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, EventType.OUTBOUND)) {
LOGGER.error("Configuration PID " + KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION + " is blocked outbound for cluster group {}",
karafCellarGroupName);
setupConfigOk = false;
}
if (setupConfigOk) {
Map<String, Properties> configurations = karafCellarClusterManager.getMap(Constants.CONFIGURATION_MAP + Configurations.SEPARATOR + karafCellarGroupName);
org.apache.karaf.cellar.core.Node thisKarafNode = karafCellarClusterManager.getNode();
Properties karafCellarClusterNodeConfiguration = configurations.get(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
if (karafCellarClusterNodeConfiguration == null) {
karafCellarClusterNodeConfiguration = new Properties();
}
Map<String, String> publicEndpoints = getMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, thisKarafNode.getId() + "=" + publicAddress);
publicEndpoints.put(thisKarafNode.getId(), publicAddress);
setMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_PUBLIC_ENDPOINTS, publicEndpoints);
Map<String, String> internalEndpoints = getMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, thisKarafNode.getId() + "=" + internalAddress);
internalEndpoints.put(thisKarafNode.getId(), internalAddress);
setMapProperty(karafCellarClusterNodeConfiguration, KARAF_CLUSTER_CONFIGURATION_INTERNAL_ENDPOINTS, internalEndpoints);
configurations.put(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION, karafCellarClusterNodeConfiguration);
ClusterConfigurationEvent clusterConfigurationEvent = new ClusterConfigurationEvent(KARAF_CELLAR_CLUSTER_NODE_CONFIGURATION);
sendEvent(clusterConfigurationEvent);
}
TimerTask statisticsTask = new TimerTask() {
@Override
public void run() {
try {
updateSystemStats();
} catch (Throwable t) {
LOGGER.error("Error updating system statistics", t);
}
}
};
schedulerService.getScheduleExecutorService().scheduleWithFixedDelay(statisticsTask, 0, nodeStatisticsUpdateFrequency, TimeUnit.MILLISECONDS);
}
LOGGER.info("Cluster service initialized.");
}