in lib/pool.dart [153:229]
Stream<T> forEach<S, T>(
Iterable<S> elements, FutureOr<T> Function(S source) action,
{bool Function(S item, Object error, StackTrace stack)? onError}) {
onError ??= (item, e, s) => true;
var cancelPending = false;
Completer? resumeCompleter;
late StreamController<T> controller;
late Iterator<S> iterator;
Future<void> run(int _) async {
while (iterator.moveNext()) {
// caching `current` is necessary because there are async breaks
// in this code and `iterator` is shared across many workers
final current = iterator.current;
_resetTimer();
if (resumeCompleter != null) {
await resumeCompleter!.future;
}
if (cancelPending) {
break;
}
T value;
try {
value = await action(current);
} catch (e, stack) {
if (onError!(current, e, stack)) {
controller.addError(e, stack);
}
continue;
}
controller.add(value);
}
}
Future<void>? doneFuture;
void onListen() {
iterator = elements.iterator;
assert(doneFuture == null);
var futures = Iterable<Future<void>>.generate(
_maxAllocatedResources, (i) => withResource(() => run(i)));
doneFuture = Future.wait(futures, eagerError: true)
.then<void>((_) {})
.catchError(controller.addError);
doneFuture!.whenComplete(controller.close);
}
controller = StreamController<T>(
sync: true,
onListen: onListen,
onCancel: () async {
assert(!cancelPending);
cancelPending = true;
await doneFuture;
},
onPause: () {
assert(resumeCompleter == null);
resumeCompleter = Completer<void>();
},
onResume: () {
assert(resumeCompleter != null);
resumeCompleter!.complete();
resumeCompleter = null;
},
);
return controller.stream;
}