in component-dsl/src/main/java/org/apache/aries/component/dsl/internal/ConfigurationsOSGiImpl.java [41:159]
public ConfigurationsOSGiImpl(String factoryPid) {
super((executionContext, op) -> {
ConcurrentHashMap<String, Long> configurationCounters =
new ConcurrentHashMap<>();
ConcurrentHashMap<String, OSGiResult> terminators =
new ConcurrentHashMap<>();
AtomicBoolean closed = new AtomicBoolean();
CountDownLatch countDownLatch = new CountDownLatch(1);
final BundleContext bundleContext = executionContext.getBundleContext();
ServiceRegistration<?> serviceRegistration =
bundleContext.registerService(
ConfigurationListener.class,
(ConfigurationEvent configurationEvent) -> {
String incomingFactoryPid =
configurationEvent.getFactoryPid();
if (!factoryPid.equals(incomingFactoryPid)) {
return;
}
try {
countDownLatch.await(1, TimeUnit.MINUTES);
}
catch (InterruptedException e) {
return;
}
String pid = configurationEvent.getPid();
Configuration configuration;
if (configurationEvent.getType() ==
ConfigurationEvent.CM_DELETED) {
configurationCounters.remove(pid);
signalLeave(pid, terminators);
}
else {
configuration = getConfiguration(
bundleContext, configurationEvent);
Long oldChangeCount = configurationCounters.putIfAbsent(
pid, configuration.getChangeCount());
if (oldChangeCount != null) {
if (oldChangeCount == configuration.getChangeCount()) {
return;
}
OSGiResult osgiResult = terminators.get(pid);
if (osgiResult != null && !UpdateSupport.sendUpdate(osgiResult)) {
return;
}
}
UpdateSupport.runUpdate(() -> {
signalLeave(pid, terminators);
terminators.put(
pid, op.apply(new ConfigurationHolderImpl(configuration)));
});
if (closed.get()) {
/*
if we have closed while executing the
effects we have to execute the terminator
directly instead of storing it
*/
signalLeave(pid, terminators);
}
}
},
new Hashtable<>());
ServiceReference<ConfigurationAdmin> serviceReference =
bundleContext.getServiceReference(ConfigurationAdmin.class);
if (serviceReference != null) {
Configuration[] configurations = getConfigurations(
bundleContext, factoryPid, serviceReference);
for (Configuration configuration : configurations) {
configurationCounters.put(
configuration.getPid(), configuration.getChangeCount());
terminators.put(
configuration.getPid(),
op.publish(new ConfigurationHolderImpl(configuration)));
}
}
countDownLatch.countDown();
return new OSGiResultImpl(
() -> {
closed.set(true);
serviceRegistration.unregister();
for (Runnable runnable : terminators.values()) {
if (runnable != null) {
runnable.run();
}
}
},
() -> terminators.values().stream().map(
OSGiResult::update
).reduce(
Boolean.FALSE, Boolean::logicalOr
));
});
}