public HighestRankingOSGi()

in component-dsl/src/main/java/org/apache/aries/component/dsl/internal/HighestRankingOSGi.java [33:108]


    public HighestRankingOSGi(
        OSGi<T> previous, Comparator<? super T> comparator,
        Function<OSGi<T>, OSGi<T>> notHighest) {

        super((executionContext, publisher) -> {
            Comparator<Tuple<T>> comparing = Comparator.comparing(
                Tuple::getT, comparator);
            PriorityQueue<Tuple<T>> set = new PriorityQueue<>(
                comparing.reversed());
            AtomicReference<Tuple<T>> sent = new AtomicReference<>();

            Pad<T, T> notHighestPad = new Pad<>(
                executionContext, notHighest, publisher);

            OSGiResult result = previous.run(
                executionContext,
                publisher.pipe(t -> {
                    Tuple<T> tuple = new Tuple<>(t);

                    synchronized (set) {
                        set.add(tuple);

                        if (set.peek() == tuple) {
                            Tuple<T> old = sent.get();

                            if (old != null) {
                                old.osgiResult.run();
                            }

                            tuple.osgiResult = publisher.apply(t);

                            if (old != null) {
                                old.osgiResult = notHighestPad.publish(old.t);
                            }

                            sent.set(tuple);
                        } else {
                            tuple.osgiResult = notHighestPad.publish(t);
                        }
                    }

                    return new OSGiResultImpl(
                        () -> {
                            synchronized (set) {
                                Tuple<T> old = set.peek();

                                set.remove(tuple);

                                Tuple<T> current = set.peek();

                                tuple.osgiResult.run();

                                if (current != old && current != null) {
                                    current.osgiResult.run();
                                    current.osgiResult = publisher.apply(
                                        current.t);
                                    sent.set(current);
                                }
                                if (current == null) {
                                    sent.set(null);
                                }
                            }
                        },
                        () -> {
                            synchronized (set) {
                                Tuple<T> current = set.peek();

                                return current.osgiResult.update();
                            }
                        }
                    );
                }));

            return new AggregateOSGiResult(result, notHighestPad);
        });
    }