in lib/src/merge.dart [136:182]
Stream<T> mergeExpanded() {
final controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
controller.onListen = () {
final subscriptions = <StreamSubscription<dynamic>>[];
final outerSubscription = listen((inner) {
if (isBroadcast && !inner.isBroadcast) {
inner = inner.asBroadcastStream();
}
final subscription =
inner.listen(controller.add, onError: controller.addError);
subscription.onDone(() {
subscriptions.remove(subscription);
if (subscriptions.isEmpty) controller.close();
});
subscriptions.add(subscription);
}, onError: controller.addError);
outerSubscription.onDone(() {
subscriptions.remove(outerSubscription);
if (subscriptions.isEmpty) controller.close();
});
subscriptions.add(outerSubscription);
if (!isBroadcast) {
controller
..onPause = () {
for (final subscription in subscriptions) {
subscription.pause();
}
}
..onResume = () {
for (final subscription in subscriptions) {
subscription.resume();
}
};
}
controller.onCancel = () {
if (subscriptions.isEmpty) return null;
var cancels = [for (var s in subscriptions) s.cancel()]
// Handle opt-out nulls
..removeWhere((Object? f) => f == null);
return Future.wait(cancels).then((_) => null);
};
};
return controller.stream;
}