Stream _debounceAggregate()

in lib/src/rate_limit.dart [239:277]


  Stream<S> _debounceAggregate<S>(
      Duration duration, S Function(T element, S? soFar) collect,
      {required bool leading, required bool trailing}) {
    Timer? timer;
    S? soFar;
    var hasPending = false;
    var shouldClose = false;
    var emittedLatestAsLeading = false;

    return transformByHandlers(onData: (value, sink) {
      void emit() {
        sink.add(soFar as S);
        soFar = null;
        hasPending = false;
      }

      timer?.cancel();
      soFar = collect(value, soFar);
      hasPending = true;
      if (timer == null && leading) {
        emittedLatestAsLeading = true;
        emit();
      } else {
        emittedLatestAsLeading = false;
      }
      timer = Timer(duration, () {
        if (trailing && !emittedLatestAsLeading) emit();
        if (shouldClose) sink.close();
        timer = null;
      });
    }, onDone: (EventSink<S> sink) {
      if (hasPending && trailing) {
        shouldClose = true;
      } else {
        timer?.cancel();
        sink.close();
      }
    });
  }