Stream mergeAll()

in lib/src/merge.dart [58:103]


  Stream<T> mergeAll(Iterable<Stream<T>> others) {
    final controller = isBroadcast
        ? StreamController<T>.broadcast(sync: true)
        : StreamController<T>(sync: true);

    final allStreams = [
      this,
      for (final other in others)
        !isBroadcast || other.isBroadcast ? other : other.asBroadcastStream(),
    ];

    controller.onListen = () {
      final subscriptions = <StreamSubscription<T>>[];
      for (final stream in allStreams) {
        final subscription =
            stream.listen(controller.add, onError: controller.addError);
        subscription.onDone(() {
          subscriptions.remove(subscription);
          if (subscriptions.isEmpty) controller.close();
        });
        subscriptions.add(subscription);
      }
      if (!isBroadcast) {
        controller
          ..onPause = () {
            for (final subscription in subscriptions) {
              subscription.pause();
            }
          }
          ..onResume = () {
            for (final subscription in subscriptions) {
              subscription.resume();
            }
          };
      }
      controller.onCancel = () {
        if (subscriptions.isEmpty) return null;
        var cancels = [for (var s in subscriptions) s.cancel()]
          // Handle opt-out nulls
          ..removeWhere((Object? f) => f == null);
        if (cancels.isEmpty) return null;
        return Future.wait(cancels).then((_) => null);
      };
    };
    return controller.stream;
  }