in lib/src/aggregate_sample.dart [17:115]
Stream<S> aggregateSample<S>(
Stream<void> trigger, S Function(T, S?) aggregate) {
var controller = isBroadcast
? StreamController<S>.broadcast(sync: true)
: StreamController<S>(sync: true);
S? currentResults;
var hasCurrentResults = false;
var waitingForTrigger = true;
var isTriggerDone = false;
var isValueDone = false;
StreamSubscription<T>? valueSub;
StreamSubscription<void>? triggerSub;
void emit() {
controller.add(currentResults as S);
currentResults = null;
hasCurrentResults = false;
waitingForTrigger = true;
}
void onValue(T value) {
currentResults = aggregate(value, currentResults);
hasCurrentResults = true;
if (!waitingForTrigger) emit();
if (isTriggerDone) {
valueSub!.cancel();
controller.close();
}
}
void onValuesDone() {
isValueDone = true;
if (!hasCurrentResults) {
triggerSub?.cancel();
controller.close();
}
}
void onTrigger(_) {
waitingForTrigger = false;
if (hasCurrentResults) emit();
if (isValueDone) {
triggerSub!.cancel();
controller.close();
}
}
void onTriggerDone() {
isTriggerDone = true;
if (waitingForTrigger) {
valueSub?.cancel();
controller.close();
}
}
controller.onListen = () {
assert(valueSub == null);
valueSub =
listen(onValue, onError: controller.addError, onDone: onValuesDone);
final priorTriggerSub = triggerSub;
if (priorTriggerSub != null) {
if (priorTriggerSub.isPaused) priorTriggerSub.resume();
} else {
triggerSub = trigger.listen(onTrigger,
onError: controller.addError, onDone: onTriggerDone);
}
if (!isBroadcast) {
controller
..onPause = () {
valueSub?.pause();
triggerSub?.pause();
}
..onResume = () {
valueSub?.resume();
triggerSub?.resume();
};
}
controller.onCancel = () {
var cancels = <Future<void>>[if (!isValueDone) valueSub!.cancel()];
valueSub = null;
if (trigger.isBroadcast || !isBroadcast) {
if (!isTriggerDone) cancels.add(triggerSub!.cancel());
triggerSub = null;
} else {
triggerSub!.pause();
}
// Handle opt-out nulls
cancels.removeWhere((Object? f) => f == null);
if (cancels.isEmpty) return null;
return Future.wait(cancels).then((_) => null);
};
};
return controller.stream;
}