in lib/src/merge.dart [58:103]
Stream<T> mergeAll(Iterable<Stream<T>> others) {
final controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
final allStreams = [
this,
for (final other in others)
!isBroadcast || other.isBroadcast ? other : other.asBroadcastStream(),
];
controller.onListen = () {
final subscriptions = <StreamSubscription<T>>[];
for (final stream in allStreams) {
final subscription =
stream.listen(controller.add, onError: controller.addError);
subscription.onDone(() {
subscriptions.remove(subscription);
if (subscriptions.isEmpty) controller.close();
});
subscriptions.add(subscription);
}
if (!isBroadcast) {
controller
..onPause = () {
for (final subscription in subscriptions) {
subscription.pause();
}
}
..onResume = () {
for (final subscription in subscriptions) {
subscription.resume();
}
};
}
controller.onCancel = () {
if (subscriptions.isEmpty) return null;
var cancels = [for (var s in subscriptions) s.cancel()]
// Handle opt-out nulls
..removeWhere((Object? f) => f == null);
if (cancels.isEmpty) return null;
return Future.wait(cancels).then((_) => null);
};
};
return controller.stream;
}