Stream forEach()

in lib/pool.dart [153:229]


  Stream<T> forEach<S, T>(
      Iterable<S> elements, FutureOr<T> Function(S source) action,
      {bool Function(S item, Object error, StackTrace stack)? onError}) {
    onError ??= (item, e, s) => true;

    var cancelPending = false;

    Completer? resumeCompleter;
    late StreamController<T> controller;

    late Iterator<S> iterator;

    Future<void> run(int _) async {
      while (iterator.moveNext()) {
        // caching `current` is necessary because there are async breaks
        // in this code and `iterator` is shared across many workers
        final current = iterator.current;

        _resetTimer();

        if (resumeCompleter != null) {
          await resumeCompleter!.future;
        }

        if (cancelPending) {
          break;
        }

        T value;
        try {
          value = await action(current);
        } catch (e, stack) {
          if (onError!(current, e, stack)) {
            controller.addError(e, stack);
          }
          continue;
        }
        controller.add(value);
      }
    }

    Future<void>? doneFuture;

    void onListen() {
      iterator = elements.iterator;

      assert(doneFuture == null);
      var futures = Iterable<Future<void>>.generate(
          _maxAllocatedResources, (i) => withResource(() => run(i)));
      doneFuture = Future.wait(futures, eagerError: true)
          .then<void>((_) {})
          .catchError(controller.addError);

      doneFuture!.whenComplete(controller.close);
    }

    controller = StreamController<T>(
      sync: true,
      onListen: onListen,
      onCancel: () async {
        assert(!cancelPending);
        cancelPending = true;
        await doneFuture;
      },
      onPause: () {
        assert(resumeCompleter == null);
        resumeCompleter = Completer<void>();
      },
      onResume: () {
        assert(resumeCompleter != null);
        resumeCompleter!.complete();
        resumeCompleter = null;
      },
    );

    return controller.stream;
  }