Stream aggregateSample()

in lib/src/aggregate_sample.dart [17:115]


  Stream<S> aggregateSample<S>(
      Stream<void> trigger, S Function(T, S?) aggregate) {
    var controller = isBroadcast
        ? StreamController<S>.broadcast(sync: true)
        : StreamController<S>(sync: true);

    S? currentResults;
    var hasCurrentResults = false;
    var waitingForTrigger = true;
    var isTriggerDone = false;
    var isValueDone = false;
    StreamSubscription<T>? valueSub;
    StreamSubscription<void>? triggerSub;

    void emit() {
      controller.add(currentResults as S);
      currentResults = null;
      hasCurrentResults = false;
      waitingForTrigger = true;
    }

    void onValue(T value) {
      currentResults = aggregate(value, currentResults);
      hasCurrentResults = true;

      if (!waitingForTrigger) emit();

      if (isTriggerDone) {
        valueSub!.cancel();
        controller.close();
      }
    }

    void onValuesDone() {
      isValueDone = true;
      if (!hasCurrentResults) {
        triggerSub?.cancel();
        controller.close();
      }
    }

    void onTrigger(_) {
      waitingForTrigger = false;

      if (hasCurrentResults) emit();

      if (isValueDone) {
        triggerSub!.cancel();
        controller.close();
      }
    }

    void onTriggerDone() {
      isTriggerDone = true;
      if (waitingForTrigger) {
        valueSub?.cancel();
        controller.close();
      }
    }

    controller.onListen = () {
      assert(valueSub == null);
      valueSub =
          listen(onValue, onError: controller.addError, onDone: onValuesDone);
      final priorTriggerSub = triggerSub;
      if (priorTriggerSub != null) {
        if (priorTriggerSub.isPaused) priorTriggerSub.resume();
      } else {
        triggerSub = trigger.listen(onTrigger,
            onError: controller.addError, onDone: onTriggerDone);
      }
      if (!isBroadcast) {
        controller
          ..onPause = () {
            valueSub?.pause();
            triggerSub?.pause();
          }
          ..onResume = () {
            valueSub?.resume();
            triggerSub?.resume();
          };
      }
      controller.onCancel = () {
        var cancels = <Future<void>>[if (!isValueDone) valueSub!.cancel()];
        valueSub = null;
        if (trigger.isBroadcast || !isBroadcast) {
          if (!isTriggerDone) cancels.add(triggerSub!.cancel());
          triggerSub = null;
        } else {
          triggerSub!.pause();
        }
        // Handle opt-out nulls
        cancels.removeWhere((Object? f) => f == null);
        if (cancels.isEmpty) return null;
        return Future.wait(cancels).then((_) => null);
      };
    };
    return controller.stream;
  }