in lib/src/frames/frame_reader.dart [20:117]
Stream<Frame> startDecoding() {
var bufferedData = <List<int>>[];
var bufferedLength = 0;
FrameHeader? tryReadHeader() {
if (bufferedLength >= FRAME_HEADER_SIZE) {
// Get at least FRAME_HEADER_SIZE bytes in the first byte array.
_mergeLists(bufferedData, FRAME_HEADER_SIZE);
// Read the frame header from the first byte array.
return _readFrameHeader(bufferedData[0], 0);
}
return null;
}
Frame? tryReadFrame(FrameHeader header) {
var totalFrameLen = FRAME_HEADER_SIZE + header.length;
if (bufferedLength >= totalFrameLen) {
// Get the whole frame in the first byte array.
_mergeLists(bufferedData, totalFrameLen);
// Read the frame.
var frame = _readFrame(header, bufferedData[0], FRAME_HEADER_SIZE);
// Update bufferedData/bufferedLength
var firstChunkLen = bufferedData[0].length;
if (firstChunkLen == totalFrameLen) {
bufferedData.removeAt(0);
} else {
bufferedData[0] = viewOrSublist(
bufferedData[0], totalFrameLen, firstChunkLen - totalFrameLen);
}
bufferedLength -= totalFrameLen;
return frame;
}
return null;
}
_framesController.onListen = () {
FrameHeader? header;
late StreamSubscription<List<int>> subscription;
void terminateWithError(Object error, [StackTrace? stack]) {
header = null;
_framesController.addError(error, stack);
subscription.cancel();
_framesController.close();
}
subscription = _inputStream.listen((List<int> data) {
bufferedData.add(data);
bufferedLength += data.length;
try {
while (true) {
header ??= tryReadHeader();
if (header != null) {
if (header!.length > _localSettings.maxFrameSize) {
terminateWithError(
FrameSizeException('Incoming frame is too big.'));
return;
}
var frame = tryReadFrame(header!);
if (frame != null) {
_framesController.add(frame);
header = null;
} else {
break;
}
} else {
break;
}
}
} catch (error, stack) {
terminateWithError(error, stack);
}
}, onError: (Object error, StackTrace stack) {
terminateWithError(error, stack);
}, onDone: () {
if (bufferedLength == 0) {
_framesController.close();
} else {
terminateWithError(FrameSizeException(
'Incoming byte stream ended with incomplete frame'));
}
});
_framesController
..onPause = subscription.pause
..onResume = subscription.resume;
};
return _framesController.stream;
}