in component-dsl/src/main/java/org/apache/aries/component/dsl/internal/BaseOSGiImpl.java [362:394]
public <K, S> OSGi<S> splitBy(
Function<T, OSGi<K>> mapper, BiFunction<K, OSGi<T>, OSGi<S>> fun) {
return new BaseOSGiImpl<>((executionContext, op) -> {
ConcurrentHashMap<K, Pad<T, S>> pads = new ConcurrentHashMap<>();
OSGiResult result = run(
executionContext,
op.pipe(t -> mapper.apply(t).run(
executionContext,
k -> pads.computeIfAbsent(
k,
__ -> new Pad<>(
executionContext,
___ -> fun.apply(k, ___), op)
).publish(t)
)
));
return new OSGiResultImpl(
() -> {
pads.values().forEach(Pad::close);
result.close();
},
() -> pads.values().stream().map(
Pad::update
).reduce(
Boolean.FALSE, Boolean::logicalOr
) | result.update()
);
});
}