in AWSIoTPythonSDK/core/protocol/connection/cores.py [0:0]
def read(self, numberOfBytes):
# Check if we have enough data for paho
# _payloadDataBuffer will not be empty ony when the payload of a new wss frame
# has been unmasked.
if len(self._payloadDataBuffer) >= numberOfBytes:
ret = self._payloadDataBuffer[0:numberOfBytes]
self._payloadDataBuffer = self._payloadDataBuffer[numberOfBytes:]
# struct.unpack(fmt, string) # Py2.x
# struct.unpack(fmt, buffer) # Py3.x
# Here ret is always in bytes (buffer interface)
if sys.version_info[0] < 3: # Py2.x
ret = str(ret)
return ret
# Emmm, We don't. Try to buffer from the socket (It's a new wss frame).
if not self._hasOpByte: # Check if we need to buffer OpByte
opByte = self._bufferedReader.read(1)
self._isFIN = (opByte[0] & 0x80) == 0x80
self._RSVBits = (opByte[0] & 0x70)
self._opCode = (opByte[0] & 0x0f)
self._hasOpByte = True # Finished buffering opByte
# Check if any of the RSV bits are set, if so, close the connection
# since client never sends negotiated extensions
if self._RSVBits != 0x0:
self._closeWssConnection()
self._connectStatus = self._WebsocketDisconnected
self._payloadDataBuffer = bytearray()
raise socket.error(ssl.SSL_ERROR_WANT_READ, "RSV bits set with NO negotiated extensions.")
if not self._hasPayloadLengthFirst: # Check if we need to buffer First Payload Length byte
payloadLengthFirst = self._bufferedReader.read(1)
self._hasPayloadLengthFirst = True # Finished buffering first byte of payload length
self._needMaskKey = (payloadLengthFirst[0] & 0x80) == 0x80
payloadLengthFirstByteArray = bytearray()
payloadLengthFirstByteArray.extend(payloadLengthFirst)
self._payloadLength = (payloadLengthFirstByteArray[0] & 0x7f)
if self._payloadLength == 126:
self._payloadLengthBytesLength = 2
self._hasPayloadLengthExtended = False # Force to buffer the extended
elif self._payloadLength == 127:
self._payloadLengthBytesLength = 8
self._hasPayloadLengthExtended = False # Force to buffer the extended
else: # _payloadLength <= 125:
self._hasPayloadLengthExtended = True # No need to buffer extended payload length
if not self._hasPayloadLengthExtended: # Check if we need to buffer Extended Payload Length bytes
payloadLengthExtended = self._bufferedReader.read(self._payloadLengthBytesLength)
self._hasPayloadLengthExtended = True
if sys.version_info[0] < 3:
payloadLengthExtended = str(payloadLengthExtended)
if self._payloadLengthBytesLength == 2:
self._payloadLength = struct.unpack("!H", payloadLengthExtended)[0]
else: # _payloadLengthBytesLength == 8
self._payloadLength = struct.unpack("!Q", payloadLengthExtended)[0]
if self._needMaskKey: # Response from server is masked, close the connection
self._closeWssConnection()
self._connectStatus = self._WebsocketDisconnected
self._payloadDataBuffer = bytearray()
raise socket.error(ssl.SSL_ERROR_WANT_READ, "Server response masked, closing connection and try again.")
if not self._hasPayload: # Check if we need to buffer the payload
payloadForThisFrame = self._bufferedReader.read(self._payloadLength)
self._hasPayload = True
# Client side should never received a masked packet from the server side
# Unmask it as needed
#if self._needMaskKey:
# for i in range(0, self._payloadLength):
# payloadForThisFrame[i] ^= self._maskKey[i % 4]
# Append it to the internal payload buffer
self._payloadDataBuffer.extend(payloadForThisFrame)
# Now we have the complete wss frame, reset the context
# Check to see if it is a wss closing frame
if self._opCode == self._OP_CONNECTION_CLOSE:
self._connectStatus = self._WebsocketDisconnected
self._payloadDataBuffer = bytearray() # Ensure that once the wss closing frame comes, we have nothing to read and start all over again
# Check to see if it is a wss PING frame
if self._opCode == self._OP_PING:
self._sendPONG() # Nothing more to do here, if the transmission of the last wssMQTT packet is not finished, it will continue
self._reset()
# Check again if we have enough data for paho
if len(self._payloadDataBuffer) >= numberOfBytes:
ret = self._payloadDataBuffer[0:numberOfBytes]
self._payloadDataBuffer = self._payloadDataBuffer[numberOfBytes:]
# struct.unpack(fmt, string) # Py2.x
# struct.unpack(fmt, buffer) # Py3.x
# Here ret is always in bytes (buffer interface)
if sys.version_info[0] < 3: # Py2.x
ret = str(ret)
return ret
else: # Fragmented MQTT packets in separate wss frames
raise socket.error(ssl.SSL_ERROR_WANT_READ, "Not a complete MQTT packet payload within this wss frame.")