in lib/src/from_handlers.dart [13:49]
Stream<T> transformByHandlers<T>(
{void Function(S, EventSink<T>)? onData,
void Function(Object, StackTrace, EventSink<T>)? onError,
void Function(EventSink<T>)? onDone}) {
final handleData = onData ?? _defaultHandleData;
final handleError = onError ?? _defaultHandleError;
final handleDone = onDone ?? _defaultHandleDone;
var controller = isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
StreamSubscription<S>? subscription;
controller.onListen = () {
assert(subscription == null);
var valuesDone = false;
subscription = listen((value) => handleData(value, controller),
onError: (Object error, StackTrace stackTrace) {
handleError(error, stackTrace, controller);
}, onDone: () {
valuesDone = true;
handleDone(controller);
});
if (!isBroadcast) {
controller
..onPause = subscription!.pause
..onResume = subscription!.resume;
}
controller.onCancel = () {
var toCancel = subscription;
subscription = null;
if (!valuesDone) return toCancel!.cancel();
return null;
};
};
return controller.stream;
}