public OSGi splitBy()

in component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java [362:394]


    public <K, S> OSGi<S> splitBy(
        Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> fun) {

        return new BaseOSGiImpl<>((executionContext, op) -> {
            ConcurrentHashMap<K, Pad<T, S>> pads = new ConcurrentHashMap<>();

            OSGiResult result = run(
                executionContext,
                op.pipe(t -> mapper.apply(t).run(
                    executionContext,
                    k -> pads.computeIfAbsent(
                        k,
                        __ -> new Pad<>(
                            executionContext,
                            ___ -> fun.apply(k, ___), op)
                    ).publish(t)
                )
            ));

            return new OSGiResultImpl(
                () -> {
                    pads.values().forEach(Pad::close);

                    result.close();
                },
                () -> pads.values().stream().map(
                    Pad::update
                ).reduce(
                    Boolean.FALSE, Boolean::logicalOr
                ) | result.update()
            );
        });
    }