in component-dsl/src/main/java/org/apache/aries/component/dsl/internal/RefreshAsUpdatesTransformer.java [30:66]
public Publisher<T> transform(Publisher<? super T> op) {
ThreadLocal<ResultState> threadLocal = ThreadLocal.withInitial(() -> null);
return t -> {
AtomicReference<OSGiResult> atomicReference = new AtomicReference<>(OSGi.NOOP);
if (!UpdateSupport.isUpdate()) {
atomicReference.set(op.publish(t));
} else {
threadLocal.get().gone = false;
}
return new OSGiResultImpl(
() -> {
if (!UpdateSupport.isUpdate()) {
atomicReference.getAndSet(OSGi.NOOP).run();
} else {
threadLocal.set(new ResultState(true, atomicReference.get()));
UpdateSupport.deferTermination(
() -> {
if (threadLocal.get().gone) {
threadLocal.get().result.run();
threadLocal.remove();
atomicReference.set(OSGi.NOOP);
} else {
threadLocal.get().result.update();
}
}
);
}
},
() -> atomicReference.get().update()
);
};
}