Stream combineLatest()

in lib/src/combine_latest.dart [36:140]


  Stream<S> combineLatest<T2, S>(
      Stream<T2> other, FutureOr<S> Function(T, T2) combine) {
    final controller = isBroadcast
        ? StreamController<S>.broadcast(sync: true)
        : StreamController<S>(sync: true);

    other =
        (isBroadcast && !other.isBroadcast) ? other.asBroadcastStream() : other;

    StreamSubscription<T>? sourceSubscription;
    StreamSubscription<T2>? otherSubscription;

    var sourceDone = false;
    var otherDone = false;

    late T latestSource;
    late T2 latestOther;

    var sourceStarted = false;
    var otherStarted = false;

    void emitCombined() {
      if (!sourceStarted || !otherStarted) return;
      FutureOr<S> result;
      try {
        result = combine(latestSource, latestOther);
      } catch (e, s) {
        controller.addError(e, s);
        return;
      }
      if (result is Future<S>) {
        sourceSubscription!.pause();
        otherSubscription!.pause();
        result
            .then(controller.add, onError: controller.addError)
            .whenComplete(() {
          sourceSubscription!.resume();
          otherSubscription!.resume();
        });
      } else {
        controller.add(result);
      }
    }

    controller.onListen = () {
      assert(sourceSubscription == null);
      sourceSubscription = listen(
          (s) {
            sourceStarted = true;
            latestSource = s;
            emitCombined();
          },
          onError: controller.addError,
          onDone: () {
            sourceDone = true;
            if (otherDone) {
              controller.close();
            } else if (!sourceStarted) {
              // Nothing can ever be emitted
              otherSubscription!.cancel();
              controller.close();
            }
          });
      otherSubscription = other.listen(
          (o) {
            otherStarted = true;
            latestOther = o;
            emitCombined();
          },
          onError: controller.addError,
          onDone: () {
            otherDone = true;
            if (sourceDone) {
              controller.close();
            } else if (!otherStarted) {
              // Nothing can ever be emitted
              sourceSubscription!.cancel();
              controller.close();
            }
          });
      if (!isBroadcast) {
        controller
          ..onPause = () {
            sourceSubscription!.pause();
            otherSubscription!.pause();
          }
          ..onResume = () {
            sourceSubscription!.resume();
            otherSubscription!.resume();
          };
      }
      controller.onCancel = () {
        var cancels = [
          sourceSubscription!.cancel(),
          otherSubscription!.cancel()
        ]
          // Handle opt-out nulls
          ..removeWhere((Object? f) => f == null);
        sourceSubscription = null;
        otherSubscription = null;
        return Future.wait(cancels).then((_) => null);
      };
    };
    return controller.stream;
  }