Future _setUpListener()

in lib/src/server/sse_handler.dart [78:113]


  Future<void> _setUpListener() async {
    while (
        !_outgoingController.isClosed && await _outgoingStreamQueue.hasNext) {
      // If we're in a KeepAlive timeout, there's nowhere to send messages so
      // wait a short period and check again.
      if (isInKeepAlivePeriod) {
        await Future.delayed(const Duration(milliseconds: 200));
        continue;
      }

      // Peek the data so we don't remove it from the stream if we're unable to
      // send it.
      final data = await _outgoingStreamQueue.peek;

      // Ignore outgoing messages since the connection may have closed while
      // waiting for the keep alive.
      if (_closedCompleter.isCompleted) break;

      try {
        // JSON encode the message to escape new lines.
        _sink.add('data: ${json.encode(data)}\n');
        _sink.add('\n');
        await _outgoingStreamQueue.next; // Consume from stream if no errors.
      } catch (e) {
        if ((e is StateError || e is SocketException) &&
            (_keepAlive != null && !_closedCompleter.isCompleted)) {
          // If we got here then the sink may have closed but the stream.onDone
          // hasn't fired yet, so pause the subscription and skip calling
          // `next` so the message remains in the queue to try again.
          _handleDisconnect();
        } else {
          rethrow;
        }
      }
    }
  }