Stream takeUntil()

in lib/src/take_until.dart [16:51]


  Stream<T> takeUntil(Future<void> trigger) {
    var controller = isBroadcast
        ? StreamController<T>.broadcast(sync: true)
        : StreamController<T>(sync: true);

    StreamSubscription<T>? subscription;
    var isDone = false;
    trigger.then((_) {
      if (isDone) return;
      isDone = true;
      subscription?.cancel();
      controller.close();
    });

    controller.onListen = () {
      if (isDone) return;
      subscription =
          listen(controller.add, onError: controller.addError, onDone: () {
        if (isDone) return;
        isDone = true;
        controller.close();
      });
      if (!isBroadcast) {
        controller
          ..onPause = subscription!.pause
          ..onResume = subscription!.resume;
      }
      controller.onCancel = () {
        if (isDone) return null;
        var toCancel = subscription!;
        subscription = null;
        return toCancel.cancel();
      };
    };
    return controller.stream;
  }