public DistributeOSGiImpl()

in component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java [34:74]


    public DistributeOSGiImpl(OSGi<T> operation, Function<OSGi<T>, OSGi<S>>... funs) {

        super((executionContext, publisher) -> {
            @SuppressWarnings("unchecked")
            Pad<T, S>[] pads = new Pad[funs.length];

            for (int i = 0; i < funs.length; i++) {
                pads[i] = new Pad<>(executionContext, funs[i], publisher);
            }

            OSGiResult result = operation.run(
                executionContext,
                publisher.pipe(t -> {
                    List<OSGiResult> terminators = new ArrayList<>(funs.length);

                    int i = 0;

                    try {
                        for (; i < funs.length; i++) {
                            terminators.add(pads[i].publish(t));
                        }
                    }
                    catch (Exception e) {
                        cleanUp(terminators);

                        throw e;
                    }

                    return new OSGiResultImpl(
                        () -> cleanUp(terminators),
                        () -> terminators.stream().map(
                            os -> os.update()
                        ).reduce(
                            Boolean.FALSE, Boolean::logicalOr
                        )
                    );
                }));

            return new AggregateOSGiResult(result, new AggregateOSGiResult(pads));
        });
    }