Stream asyncWhere()

in lib/src/where.dart [39:57]


  Stream<T> asyncWhere(FutureOr<bool> Function(T) test) {
    var valuesWaiting = 0;
    var sourceDone = false;
    return transformByHandlers(onData: (element, sink) {
      valuesWaiting++;
      () async {
        try {
          if (await test(element)) sink.add(element);
        } catch (e, st) {
          sink.addError(e, st);
        }
        valuesWaiting--;
        if (valuesWaiting <= 0 && sourceDone) sink.close();
      }();
    }, onDone: (sink) {
      sourceDone = true;
      if (valuesWaiting <= 0) sink.close();
    });
  }