Stream switchLatest()

in lib/src/switch.dart [56:134]


  Stream<T> switchLatest() {
    var controller = isBroadcast
        ? StreamController<T>.broadcast(sync: true)
        : StreamController<T>(sync: true);

    controller.onListen = () {
      StreamSubscription<T>? innerSubscription;
      var outerStreamDone = false;

      void listenToInnerStream(Stream<T> innerStream) {
        assert(innerSubscription == null);
        var subscription = innerStream
            .listen(controller.add, onError: controller.addError, onDone: () {
          innerSubscription = null;
          if (outerStreamDone) controller.close();
        });
        // If a pause happens during an innerSubscription.cancel,
        // we still listen to the next stream when the cancel is done.
        // Then we immediately pause it again here.
        if (controller.isPaused) subscription.pause();
        innerSubscription = subscription;
      }

      var addError = controller.addError;
      final outerSubscription = listen(null, onError: addError, onDone: () {
        outerStreamDone = true;
        if (innerSubscription == null) controller.close();
      });
      outerSubscription.onData((innerStream) async {
        var currentSubscription = innerSubscription;
        if (currentSubscription == null) {
          listenToInnerStream(innerStream);
          return;
        }
        innerSubscription = null;
        outerSubscription.pause();
        try {
          await currentSubscription.cancel();
        } catch (error, stack) {
          controller.addError(error, stack);
        } finally {
          if (!isBroadcast && !controller.hasListener) {
            // Result single-subscription stream subscription was cancelled
            // while waiting for previous innerStream cancel.
            //
            // Ensure that the last received stream is also listened to and
            // cancelled, then do nothing further.
            innerStream.listen(null).cancel().ignore();
          } else {
            outerSubscription.resume();
            listenToInnerStream(innerStream);
          }
        }
      });
      if (!isBroadcast) {
        controller
          ..onPause = () {
            innerSubscription?.pause();
            outerSubscription.pause();
          }
          ..onResume = () {
            innerSubscription?.resume();
            outerSubscription.resume();
          };
      }
      controller.onCancel = () {
        var _innerSubscription = innerSubscription;
        var cancels = [
          if (!outerStreamDone) outerSubscription.cancel(),
          if (_innerSubscription != null) _innerSubscription.cancel(),
        ]
          // Handle opt-out nulls
          ..removeWhere((Object? f) => f == null);
        if (cancels.isEmpty) return null;
        return Future.wait(cancels).then(_ignore);
      };
    };
    return controller.stream;
  }