Stream mergeExpanded()

in lib/src/merge.dart [136:182]


  Stream<T> mergeExpanded() {
    final controller = isBroadcast
        ? StreamController<T>.broadcast(sync: true)
        : StreamController<T>(sync: true);

    controller.onListen = () {
      final subscriptions = <StreamSubscription<dynamic>>[];
      final outerSubscription = listen((inner) {
        if (isBroadcast && !inner.isBroadcast) {
          inner = inner.asBroadcastStream();
        }
        final subscription =
            inner.listen(controller.add, onError: controller.addError);
        subscription.onDone(() {
          subscriptions.remove(subscription);
          if (subscriptions.isEmpty) controller.close();
        });
        subscriptions.add(subscription);
      }, onError: controller.addError);
      outerSubscription.onDone(() {
        subscriptions.remove(outerSubscription);
        if (subscriptions.isEmpty) controller.close();
      });
      subscriptions.add(outerSubscription);
      if (!isBroadcast) {
        controller
          ..onPause = () {
            for (final subscription in subscriptions) {
              subscription.pause();
            }
          }
          ..onResume = () {
            for (final subscription in subscriptions) {
              subscription.resume();
            }
          };
      }
      controller.onCancel = () {
        if (subscriptions.isEmpty) return null;
        var cancels = [for (var s in subscriptions) s.cancel()]
          // Handle opt-out nulls
          ..removeWhere((Object? f) => f == null);
        return Future.wait(cancels).then((_) => null);
      };
    };
    return controller.stream;
  }