in lib/src/combine_latest.dart [36:140]
Stream<S> combineLatest<T2, S>(
Stream<T2> other, FutureOr<S> Function(T, T2) combine) {
final controller = isBroadcast
? StreamController<S>.broadcast(sync: true)
: StreamController<S>(sync: true);
other =
(isBroadcast && !other.isBroadcast) ? other.asBroadcastStream() : other;
StreamSubscription<T>? sourceSubscription;
StreamSubscription<T2>? otherSubscription;
var sourceDone = false;
var otherDone = false;
late T latestSource;
late T2 latestOther;
var sourceStarted = false;
var otherStarted = false;
void emitCombined() {
if (!sourceStarted || !otherStarted) return;
FutureOr<S> result;
try {
result = combine(latestSource, latestOther);
} catch (e, s) {
controller.addError(e, s);
return;
}
if (result is Future<S>) {
sourceSubscription!.pause();
otherSubscription!.pause();
result
.then(controller.add, onError: controller.addError)
.whenComplete(() {
sourceSubscription!.resume();
otherSubscription!.resume();
});
} else {
controller.add(result);
}
}
controller.onListen = () {
assert(sourceSubscription == null);
sourceSubscription = listen(
(s) {
sourceStarted = true;
latestSource = s;
emitCombined();
},
onError: controller.addError,
onDone: () {
sourceDone = true;
if (otherDone) {
controller.close();
} else if (!sourceStarted) {
// Nothing can ever be emitted
otherSubscription!.cancel();
controller.close();
}
});
otherSubscription = other.listen(
(o) {
otherStarted = true;
latestOther = o;
emitCombined();
},
onError: controller.addError,
onDone: () {
otherDone = true;
if (sourceDone) {
controller.close();
} else if (!otherStarted) {
// Nothing can ever be emitted
sourceSubscription!.cancel();
controller.close();
}
});
if (!isBroadcast) {
controller
..onPause = () {
sourceSubscription!.pause();
otherSubscription!.pause();
}
..onResume = () {
sourceSubscription!.resume();
otherSubscription!.resume();
};
}
controller.onCancel = () {
var cancels = [
sourceSubscription!.cancel(),
otherSubscription!.cancel()
]
// Handle opt-out nulls
..removeWhere((Object? f) => f == null);
sourceSubscription = null;
otherSubscription = null;
return Future.wait(cancels).then((_) => null);
};
};
return controller.stream;
}