in component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java [33:108]
public HighestRankingOSGi(
OSGi<T> previous, Comparator<? super T> comparator,
Function<OSGi<T>, OSGi<T>> notHighest) {
super((executionContext, publisher) -> {
Comparator<Tuple<T>> comparing = Comparator.comparing(
Tuple::getT, comparator);
PriorityQueue<Tuple<T>> set = new PriorityQueue<>(
comparing.reversed());
AtomicReference<Tuple<T>> sent = new AtomicReference<>();
Pad<T, T> notHighestPad = new Pad<>(
executionContext, notHighest, publisher);
OSGiResult result = previous.run(
executionContext,
publisher.pipe(t -> {
Tuple<T> tuple = new Tuple<>(t);
synchronized (set) {
set.add(tuple);
if (set.peek() == tuple) {
Tuple<T> old = sent.get();
if (old != null) {
old.osgiResult.run();
}
tuple.osgiResult = publisher.apply(t);
if (old != null) {
old.osgiResult = notHighestPad.publish(old.t);
}
sent.set(tuple);
} else {
tuple.osgiResult = notHighestPad.publish(t);
}
}
return new OSGiResultImpl(
() -> {
synchronized (set) {
Tuple<T> old = set.peek();
set.remove(tuple);
Tuple<T> current = set.peek();
tuple.osgiResult.run();
if (current != old && current != null) {
current.osgiResult.run();
current.osgiResult = publisher.apply(
current.t);
sent.set(current);
}
if (current == null) {
sent.set(null);
}
}
},
() -> {
synchronized (set) {
Tuple<T> current = set.peek();
return current.osgiResult.update();
}
}
);
}));
return new AggregateOSGiResult(result, notHighestPad);
});
}