Stream startDecoding()

in lib/src/frames/frame_reader.dart [20:117]


  Stream<Frame> startDecoding() {
    var bufferedData = <List<int>>[];
    var bufferedLength = 0;

    FrameHeader? tryReadHeader() {
      if (bufferedLength >= FRAME_HEADER_SIZE) {
        // Get at least FRAME_HEADER_SIZE bytes in the first byte array.
        _mergeLists(bufferedData, FRAME_HEADER_SIZE);

        // Read the frame header from the first byte array.
        return _readFrameHeader(bufferedData[0], 0);
      }
      return null;
    }

    Frame? tryReadFrame(FrameHeader header) {
      var totalFrameLen = FRAME_HEADER_SIZE + header.length;
      if (bufferedLength >= totalFrameLen) {
        // Get the whole frame in the first byte array.
        _mergeLists(bufferedData, totalFrameLen);

        // Read the frame.
        var frame = _readFrame(header, bufferedData[0], FRAME_HEADER_SIZE);

        // Update bufferedData/bufferedLength
        var firstChunkLen = bufferedData[0].length;
        if (firstChunkLen == totalFrameLen) {
          bufferedData.removeAt(0);
        } else {
          bufferedData[0] = viewOrSublist(
              bufferedData[0], totalFrameLen, firstChunkLen - totalFrameLen);
        }
        bufferedLength -= totalFrameLen;

        return frame;
      }
      return null;
    }

    _framesController.onListen = () {
      FrameHeader? header;

      late StreamSubscription<List<int>> subscription;

      void terminateWithError(Object error, [StackTrace? stack]) {
        header = null;
        _framesController.addError(error, stack);
        subscription.cancel();
        _framesController.close();
      }

      subscription = _inputStream.listen((List<int> data) {
        bufferedData.add(data);
        bufferedLength += data.length;

        try {
          while (true) {
            header ??= tryReadHeader();
            if (header != null) {
              if (header!.length > _localSettings.maxFrameSize) {
                terminateWithError(
                    FrameSizeException('Incoming frame is too big.'));
                return;
              }

              var frame = tryReadFrame(header!);

              if (frame != null) {
                _framesController.add(frame);
                header = null;
              } else {
                break;
              }
            } else {
              break;
            }
          }
        } catch (error, stack) {
          terminateWithError(error, stack);
        }
      }, onError: (Object error, StackTrace stack) {
        terminateWithError(error, stack);
      }, onDone: () {
        if (bufferedLength == 0) {
          _framesController.close();
        } else {
          terminateWithError(FrameSizeException(
              'Incoming byte stream ended with incomplete frame'));
        }
      });

      _framesController
        ..onPause = subscription.pause
        ..onResume = subscription.resume;
    };

    return _framesController.stream;
  }