def _feed_data()

in HowTo/gRPC/Linux/OpenAI/LangChain/PyServer/venv/Lib/aiohttp/http_websocket.py [0:0]


    def _feed_data(self, data: bytes) -> Tuple[bool, bytes]:
        for fin, opcode, payload, compressed in self.parse_frame(data):
            if compressed and not self._decompressobj:
                self._decompressobj = zlib.decompressobj(wbits=-zlib.MAX_WBITS)
            if opcode == WSMsgType.CLOSE:
                if len(payload) >= 2:
                    close_code = UNPACK_CLOSE_CODE(payload[:2])[0]
                    if close_code < 3000 and close_code not in ALLOWED_CLOSE_CODES:
                        raise WebSocketError(
                            WSCloseCode.PROTOCOL_ERROR,
                            f"Invalid close code: {close_code}",
                        )
                    try:
                        close_message = payload[2:].decode("utf-8")
                    except UnicodeDecodeError as exc:
                        raise WebSocketError(
                            WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message"
                        ) from exc
                    msg = WSMessage(WSMsgType.CLOSE, close_code, close_message)
                elif payload:
                    raise WebSocketError(
                        WSCloseCode.PROTOCOL_ERROR,
                        f"Invalid close frame: {fin} {opcode} {payload!r}",
                    )
                else:
                    msg = WSMessage(WSMsgType.CLOSE, 0, "")

                self.queue.feed_data(msg, 0)

            elif opcode == WSMsgType.PING:
                self.queue.feed_data(
                    WSMessage(WSMsgType.PING, payload, ""), len(payload)
                )

            elif opcode == WSMsgType.PONG:
                self.queue.feed_data(
                    WSMessage(WSMsgType.PONG, payload, ""), len(payload)
                )

            elif (
                opcode not in (WSMsgType.TEXT, WSMsgType.BINARY)
                and self._opcode is None
            ):
                raise WebSocketError(
                    WSCloseCode.PROTOCOL_ERROR, f"Unexpected opcode={opcode!r}"
                )
            else:
                # load text/binary
                if not fin:
                    # got partial frame payload
                    if opcode != WSMsgType.CONTINUATION:
                        self._opcode = opcode
                    self._partial.extend(payload)
                    if self._max_msg_size and len(self._partial) >= self._max_msg_size:
                        raise WebSocketError(
                            WSCloseCode.MESSAGE_TOO_BIG,
                            "Message size {} exceeds limit {}".format(
                                len(self._partial), self._max_msg_size
                            ),
                        )
                else:
                    # previous frame was non finished
                    # we should get continuation opcode
                    if self._partial:
                        if opcode != WSMsgType.CONTINUATION:
                            raise WebSocketError(
                                WSCloseCode.PROTOCOL_ERROR,
                                "The opcode in non-fin frame is expected "
                                "to be zero, got {!r}".format(opcode),
                            )

                    if opcode == WSMsgType.CONTINUATION:
                        assert self._opcode is not None
                        opcode = self._opcode
                        self._opcode = None

                    self._partial.extend(payload)
                    if self._max_msg_size and len(self._partial) >= self._max_msg_size:
                        raise WebSocketError(
                            WSCloseCode.MESSAGE_TOO_BIG,
                            "Message size {} exceeds limit {}".format(
                                len(self._partial), self._max_msg_size
                            ),
                        )

                    # Decompress process must to be done after all packets
                    # received.
                    if compressed:
                        self._partial.extend(_WS_DEFLATE_TRAILING)
                        payload_merged = self._decompressobj.decompress(
                            self._partial, self._max_msg_size
                        )
                        if self._decompressobj.unconsumed_tail:
                            left = len(self._decompressobj.unconsumed_tail)
                            raise WebSocketError(
                                WSCloseCode.MESSAGE_TOO_BIG,
                                "Decompressed message size {} exceeds limit {}".format(
                                    self._max_msg_size + left, self._max_msg_size
                                ),
                            )
                    else:
                        payload_merged = bytes(self._partial)

                    self._partial.clear()

                    if opcode == WSMsgType.TEXT:
                        try:
                            text = payload_merged.decode("utf-8")
                            self.queue.feed_data(
                                WSMessage(WSMsgType.TEXT, text, ""), len(text)
                            )
                        except UnicodeDecodeError as exc:
                            raise WebSocketError(
                                WSCloseCode.INVALID_TEXT, "Invalid UTF-8 text message"
                            ) from exc
                    else:
                        self.queue.feed_data(
                            WSMessage(WSMsgType.BINARY, payload_merged, ""),
                            len(payload_merged),
                        )

        return False, b""