public Publisher transform()

in component-dsl/src/main/java/org/apache/aries/component/dsl/internal/OnceTransformer.java [31:53]


    public Publisher<T> transform(Publisher<? super T> op) {
        AtomicInteger count = new AtomicInteger();

        AtomicReference<OSGiResult> terminator = new AtomicReference<>();

        return t -> {
            if (count.getAndIncrement() == 0) {
                UpdateSupport.deferPublication(
                    () -> terminator.set(op.apply(t)));
            }

            return new OSGiResultImpl(() -> {
                if (count.decrementAndGet() == 0) {
                    UpdateSupport.deferTermination(() -> {
                        Runnable runnable = terminator.getAndSet(OSGi.NOOP);

                        runnable.run();
                    });
                }},
                () -> terminator.get().update()
            );
        };
    }