in component-dsl/src/main/java/org/apache/aries/component/dsl/internal/DistributeOSGiImpl.java [34:74]
public DistributeOSGiImpl(OSGi<T> operation, Function<OSGi<T>, OSGi<S>>... funs) {
super((executionContext, publisher) -> {
@SuppressWarnings("unchecked")
Pad<T, S>[] pads = new Pad[funs.length];
for (int i = 0; i < funs.length; i++) {
pads[i] = new Pad<>(executionContext, funs[i], publisher);
}
OSGiResult result = operation.run(
executionContext,
publisher.pipe(t -> {
List<OSGiResult> terminators = new ArrayList<>(funs.length);
int i = 0;
try {
for (; i < funs.length; i++) {
terminators.add(pads[i].publish(t));
}
}
catch (Exception e) {
cleanUp(terminators);
throw e;
}
return new OSGiResultImpl(
() -> cleanUp(terminators),
() -> terminators.stream().map(
os -> os.update()
).reduce(
Boolean.FALSE, Boolean::logicalOr
)
);
}));
return new AggregateOSGiResult(result, new AggregateOSGiResult(pads));
});
}