in bundle/src/main/java/org/apache/karaf/cellar/bundle/BundleSynchronizer.java [228:325]
public void push(Group group) {
if (eventProducer.getSwitch().getStatus().equals(SwitchStatus.OFF)) {
LOGGER.warn("CELLAR BUNDLE: cluster event producer is OFF");
return;
}
if (group != null) {
String groupName = group.getName();
LOGGER.debug("CELLAR BUNDLE: pushing bundles to cluster group {}", groupName);
Map<String, BundleState> clusterBundles = clusterManager.getMap(Constants.BUNDLE_MAP + Configurations.SEPARATOR + groupName);
ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
Bundle[] bundles;
BundleContext bundleContext = ((BundleReference) getClass().getClassLoader()).getBundle().getBundleContext();
bundles = bundleContext.getBundles();
// push local bundles to the cluster
for (Bundle bundle : bundles) {
long bundleId = bundle.getBundleId();
String symbolicName = bundle.getSymbolicName();
String version = bundle.getHeaders().get(org.osgi.framework.Constants.BUNDLE_VERSION);
String bundleLocation = bundle.getLocation();
int status = bundle.getState();
int level = bundle.adapt(BundleStartLevel.class).getStartLevel();
String id = getId(bundle);
// check if the pid is marked as local.
if (isAllowed(group, Constants.CATEGORY, bundleLocation, EventType.OUTBOUND)) {
if (!clusterBundles.containsKey(id)) {
LOGGER.debug("CELLAR BUNDLE: deploying bundle {} on the cluster", id);
BundleState bundleState = new BundleState();
// get the bundle name or location.
String name = (String) bundle.getHeaders().get(org.osgi.framework.Constants.BUNDLE_NAME);
// if there is no name, then default to symbolic name.
name = (name == null) ? symbolicName : name;
// if there is no symbolic name, resort to location.
name = (name == null) ? bundle.getLocation() : name;
bundleState.setId(bundleId);
bundleState.setName(name);
bundleState.setStartLevel(level);
bundleState.setSymbolicName(symbolicName);
bundleState.setVersion(version);
bundleState.setLocation(bundleLocation);
bundleState.setStatus(status);
// update cluster state
clusterBundles.put(id, bundleState);
// send cluster event
ClusterBundleEvent clusterEvent = new ClusterBundleEvent(symbolicName, version, bundleLocation, level, status);
clusterEvent.setSourceGroup(group);
clusterEvent.setSourceNode(clusterManager.getNode());
clusterEvent.setLocal(clusterManager.getNode());
eventProducer.produce(clusterEvent);
} else {
BundleState bundleState = clusterBundles.get(id);
if (bundleState.getStatus() != status) {
LOGGER.debug("CELLAR BUNDLE: updating bundle id: {}, name: {}, location: {} status: {} on the cluster", id, symbolicName, bundleLocation, status);
// update cluster state
bundleState.setStatus(status);
clusterBundles.put(id, bundleState);
// send cluster event
ClusterBundleEvent clusterEvent = new ClusterBundleEvent(symbolicName, version, bundleLocation, level, status);
clusterEvent.setSourceGroup(group);
clusterEvent.setSourceNode(clusterManager.getNode());
clusterEvent.setLocal(clusterManager.getNode());
eventProducer.produce(clusterEvent);
}
}
} else LOGGER.trace("CELLAR BUNDLE: bundle {} is marked BLOCKED OUTBOUND for cluster group {}", bundleLocation, groupName);
}
// clean bundles on the cluster not present locally
for (Map.Entry<String, BundleState> entry : clusterBundles.entrySet()) {
String id = entry.getKey();
BundleState state = entry.getValue();
if (state != null && isAllowed(group, Constants.CATEGORY, state.getLocation(), EventType.OUTBOUND)) {
boolean found = false;
for (Bundle bundle : bundleContext.getBundles()) {
String localBundleId = getId(bundle);
if (id.equals(localBundleId)) {
found = true;
break;
}
}
if (!found) {
clusterBundles.remove(id);
}
}
}
getSynchronizerMap().putIfAbsent(Constants.BUNDLE_MAP + Configurations.SEPARATOR + groupName, true);
} finally {
Thread.currentThread().setContextClassLoader(originalClassLoader);
}
}
}