public CoalesceOSGiImpl()

in component-dsl/src/main/java/org/apache/aries/component/dsl/internal/CoalesceOSGiImpl.java [35:145]


    public CoalesceOSGiImpl(OSGi<T>... programs) {
        super((executionContext, op) -> {
            AtomicBoolean initialized = new AtomicBoolean();
            AtomicInteger[] atomicIntegers = new AtomicInteger[programs.length];
            OSGiResult[] results = new OSGiResult[programs.length];
            AtomicInteger index = new AtomicInteger();
            @SuppressWarnings("unchecked")
            Publisher<T>[] publishers = new Publisher[programs.length];

            for (int i = 0; i < atomicIntegers.length; i++) {
                atomicIntegers[i] = new AtomicInteger();
            }

            for (int i = 0; i < atomicIntegers.length; i++) {
                AtomicInteger atomicInteger = atomicIntegers[i];

                final int pos = i;

                publishers[i] = t -> {
                    AtomicReference<OSGiResult> result =
                        new AtomicReference<>();

                    synchronized (initialized) {
                        atomicInteger.incrementAndGet();

                        if (initialized.get()) {
                            int indexInt = index.getAndSet(pos);

                            if (pos < indexInt) {
                                for (int j = pos + 1; j <= indexInt; j++) {
                                    results[j].close();
                                }

                            }
                        }

                        UpdateSupport.deferPublication(
                            () -> result.set(op.publish(t)));
                    }

                    return new OSGiResultImpl(() -> UpdateSupport.deferTermination(() -> {
                        synchronized (initialized) {
                            result.get().close();

                            int current = atomicInteger.decrementAndGet();

                            if (!initialized.get()) {
                                return;
                            }

                            if (pos <= index.get() && current == 0) {
                                for (int j = pos + 1; j < results.length; j++) {
                                    results[j] = programs[j].run(
                                        executionContext, publishers[j]);

                                    index.set(j);

                                    if (atomicIntegers[j].get() > 0) {
                                        break;
                                    }
                                }
                            }
                        }
                    }),
                    () -> {
                        synchronized (initialized) {
                            return result.get().update();
                        }
                    });
                };
            }

            synchronized (initialized) {
                for (int i = 0; i < publishers.length; i++) {

                    results[i] = programs[i].run(executionContext, publishers[i]);

                    index.set(i);

                    if (atomicIntegers[i].get() > 0) {
                        initialized.set(true);

                        break;
                    }
                }

                initialized.set(true);
            }

            return new OSGiResultImpl(
                () -> {
                    synchronized (initialized) {
                        initialized.set(false);

                        for (int i = 0; i <= index.get(); i++) {
                            results[i].close();
                        }
                    }
                },
                () -> {
                    synchronized (initialized) {
                        return Arrays.stream(results).map(
                            res -> res.update()
                        ).reduce(
                            Boolean.FALSE, Boolean::logicalOr
                        );
                    }
                }
            );
        });
    }