in component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java [104:195]
public <S> OSGi<S> applyTo(OSGi<Function<T, S>> fun) {
return new BaseOSGiImpl<>((executionContext, op) -> {
ConcurrentDoublyLinkedList<T> identities = new ConcurrentDoublyLinkedList<>();
ConcurrentDoublyLinkedList<Function<T,S>> functions = new ConcurrentDoublyLinkedList<>();
IdentityHashMap<T, IdentityHashMap<Function<T, S>, OSGiResult>>
terminators = new IdentityHashMap<>();
OSGiResult funRun = fun.run(
executionContext,
op.pipe(f -> {
synchronized(identities) {
ConcurrentDoublyLinkedList.Node node = functions.addLast(f);
for (T t : identities) {
IdentityHashMap<Function<T, S>, OSGiResult> terminatorMap =
terminators.computeIfAbsent(
t, __ -> new IdentityHashMap<>());
terminatorMap.put(f, op.apply(f.apply(t)));
}
return new OSGiResultImpl(
() -> {
synchronized (identities) {
node.remove();
identities.forEach(t -> {
Runnable terminator = terminators.get(t).remove(f);
terminator.run();
});
}
},
() -> {
synchronized (identities) {
return identities.stream().map(t -> {
OSGiResult terminator = terminators.get(t).get(f);
return terminator.update();
}).reduce(
Boolean.FALSE, Boolean::logicalOr
);
}
}
);
}
}
));
OSGiResult myRun = run(
executionContext,
op.pipe(t -> {
synchronized (identities) {
ConcurrentDoublyLinkedList.Node node = identities.addLast(t);
for (Function<T, S> f : functions) {
IdentityHashMap<Function<T, S>, OSGiResult> terminatorMap =
terminators.computeIfAbsent(
t, __ -> new IdentityHashMap<>());
terminatorMap.put(f, op.apply(f.apply(t)));
}
return new OSGiResultImpl(
() -> {
synchronized (identities) {
node.remove();
functions.forEach(f -> {
Runnable terminator = terminators.get(t).remove(f);
terminator.run();
});
}
},
() -> {
synchronized (identities) {
return functions.stream().map(f -> {
OSGiResult terminator = terminators.get(t).get(f);
return terminator.update();
}).reduce(
Boolean.FALSE, Boolean::logicalOr
);
}
}
);
}
})
);
return new AggregateOSGiResult(myRun, funRun);
});
}