public OSGi applyTo()

in component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java [104:195]


    public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
        return new BaseOSGiImpl<>((executionContext, op) -> {
            ConcurrentDoublyLinkedList<T> identities = new ConcurrentDoublyLinkedList<>();
            ConcurrentDoublyLinkedList<Function<T,S>> functions = new ConcurrentDoublyLinkedList<>();
            IdentityHashMap<T, IdentityHashMap<Function<T, S>, OSGiResult>>
                terminators = new IdentityHashMap<>();

            OSGiResult funRun = fun.run(
                executionContext,
                op.pipe(f -> {
                    synchronized(identities) {
                        ConcurrentDoublyLinkedList.Node node = functions.addLast(f);

                        for (T t : identities) {
                            IdentityHashMap<Function<T, S>, OSGiResult> terminatorMap =
                                terminators.computeIfAbsent(
                                    t, __ -> new IdentityHashMap<>());
                            terminatorMap.put(f, op.apply(f.apply(t)));
                        }

                        return new OSGiResultImpl(
                            () -> {
                                synchronized (identities) {
                                    node.remove();

                                    identities.forEach(t -> {
                                        Runnable terminator = terminators.get(t).remove(f);

                                        terminator.run();
                                    });
                                }
                            },
                            () -> {
                                synchronized (identities) {
                                    return identities.stream().map(t -> {
                                        OSGiResult terminator = terminators.get(t).get(f);

                                        return terminator.update();
                                    }).reduce(
                                        Boolean.FALSE, Boolean::logicalOr
                                    );
                                }
                            }
                        );
                    }
                }
            ));

            OSGiResult myRun = run(
                executionContext,
                op.pipe(t -> {
                    synchronized (identities) {
                        ConcurrentDoublyLinkedList.Node node = identities.addLast(t);

                        for (Function<T, S> f : functions) {
                            IdentityHashMap<Function<T, S>, OSGiResult> terminatorMap =
                                terminators.computeIfAbsent(
                                    t, __ -> new IdentityHashMap<>());
                            terminatorMap.put(f, op.apply(f.apply(t)));
                        }

                        return new OSGiResultImpl(
                            () -> {
                                synchronized (identities) {
                                    node.remove();

                                    functions.forEach(f -> {
                                        Runnable terminator = terminators.get(t).remove(f);

                                        terminator.run();
                                    });
                                }
                            },
                            () -> {
                                synchronized (identities) {
                                    return functions.stream().map(f -> {
                                        OSGiResult terminator = terminators.get(t).get(f);

                                        return terminator.update();
                                    }).reduce(
                                        Boolean.FALSE, Boolean::logicalOr
                                    );
                                }
                            }
                        );
                    }
                })
            );

            return new AggregateOSGiResult(myRun, funRun);
        });
    }