Stream bind()

in lib/src/stream_closer.dart [61:107]


  Stream<T> bind(Stream<T> stream) {
    var controller = stream.isBroadcast
        ? StreamController<T>.broadcast(sync: true)
        : StreamController<T>(sync: true);

    controller.onListen = () {
      if (isClosed) {
        // Ignore errors here, because otherwise there would be no way for the
        // user to handle them gracefully.
        stream.listen(null).cancel().catchError((_) {});
        return;
      }

      var subscription =
          stream.listen(controller.add, onError: controller.addError);
      subscription.onDone(() {
        _subscriptions.remove(subscription);
        _controllers.remove(controller);
        controller.close();
      });
      _subscriptions.add(subscription);

      if (!stream.isBroadcast) {
        controller.onPause = subscription.pause;
        controller.onResume = subscription.resume;
      }

      controller.onCancel = () {
        _controllers.remove(controller);

        // If the subscription has already been removed, that indicates that the
        // underlying stream has been cancelled by [close] and its cancellation
        // future has been handled there. In that case, we shouldn't forward it
        // here as well.
        if (_subscriptions.remove(subscription)) return subscription.cancel();
        return null;
      };
    };

    if (isClosed) {
      controller.close();
    } else {
      _controllers.add(controller);
    }

    return controller.stream;
  }