Stream concurrentAsyncMap()

in lib/src/async_map.dart [91:109]


  Stream<S> concurrentAsyncMap<S>(FutureOr<S> Function(T) convert) {
    var valuesWaiting = 0;
    var sourceDone = false;
    return transformByHandlers(onData: (element, sink) {
      valuesWaiting++;
      () async {
        try {
          sink.add(await convert(element));
        } catch (e, st) {
          sink.addError(e, st);
        }
        valuesWaiting--;
        if (valuesWaiting <= 0 && sourceDone) sink.close();
      }();
    }, onDone: (sink) {
      sourceDone = true;
      if (valuesWaiting <= 0) sink.close();
    });
  }