Stream transformByHandlers()

in lib/src/from_handlers.dart [13:49]


  Stream<T> transformByHandlers<T>(
      {void Function(S, EventSink<T>)? onData,
      void Function(Object, StackTrace, EventSink<T>)? onError,
      void Function(EventSink<T>)? onDone}) {
    final handleData = onData ?? _defaultHandleData;
    final handleError = onError ?? _defaultHandleError;
    final handleDone = onDone ?? _defaultHandleDone;

    var controller = isBroadcast
        ? StreamController<T>.broadcast(sync: true)
        : StreamController<T>(sync: true);

    StreamSubscription<S>? subscription;
    controller.onListen = () {
      assert(subscription == null);
      var valuesDone = false;
      subscription = listen((value) => handleData(value, controller),
          onError: (Object error, StackTrace stackTrace) {
        handleError(error, stackTrace, controller);
      }, onDone: () {
        valuesDone = true;
        handleDone(controller);
      });
      if (!isBroadcast) {
        controller
          ..onPause = subscription!.pause
          ..onResume = subscription!.resume;
      }
      controller.onCancel = () {
        var toCancel = subscription;
        subscription = null;
        if (!valuesDone) return toCancel!.cancel();
        return null;
      };
    };
    return controller.stream;
  }