in lib/src/server/sse_handler.dart [78:113]
Future<void> _setUpListener() async {
while (
!_outgoingController.isClosed && await _outgoingStreamQueue.hasNext) {
// If we're in a KeepAlive timeout, there's nowhere to send messages so
// wait a short period and check again.
if (isInKeepAlivePeriod) {
await Future.delayed(const Duration(milliseconds: 200));
continue;
}
// Peek the data so we don't remove it from the stream if we're unable to
// send it.
final data = await _outgoingStreamQueue.peek;
// Ignore outgoing messages since the connection may have closed while
// waiting for the keep alive.
if (_closedCompleter.isCompleted) break;
try {
// JSON encode the message to escape new lines.
_sink.add('data: ${json.encode(data)}\n');
_sink.add('\n');
await _outgoingStreamQueue.next; // Consume from stream if no errors.
} catch (e) {
if ((e is StateError || e is SocketException) &&
(_keepAlive != null && !_closedCompleter.isCompleted)) {
// If we got here then the sink may have closed but the stream.onDone
// hasn't fired yet, so pause the subscription and skip calling
// `next` so the message remains in the queue to try again.
_handleDisconnect();
} else {
rethrow;
}
}
}
}