in lib/src/stream_closer.dart [61:107]
Stream<T> bind(Stream<T> stream) {
var controller = stream.isBroadcast
? StreamController<T>.broadcast(sync: true)
: StreamController<T>(sync: true);
controller.onListen = () {
if (isClosed) {
// Ignore errors here, because otherwise there would be no way for the
// user to handle them gracefully.
stream.listen(null).cancel().catchError((_) {});
return;
}
var subscription =
stream.listen(controller.add, onError: controller.addError);
subscription.onDone(() {
_subscriptions.remove(subscription);
_controllers.remove(controller);
controller.close();
});
_subscriptions.add(subscription);
if (!stream.isBroadcast) {
controller.onPause = subscription.pause;
controller.onResume = subscription.resume;
}
controller.onCancel = () {
_controllers.remove(controller);
// If the subscription has already been removed, that indicates that the
// underlying stream has been cancelled by [close] and its cancellation
// future has been handled there. In that case, we shouldn't forward it
// here as well.
if (_subscriptions.remove(subscription)) return subscription.cancel();
return null;
};
};
if (isClosed) {
controller.close();
} else {
_controllers.add(controller);
}
return controller.stream;
}