in AWSIoTPythonSDK/core/protocol/paho/client.py [0:0]
def _packet_read(self):
# This gets called if pselect() indicates that there is network data
# available - ie. at least one byte. What we do depends on what data we
# already have.
# If we've not got a command, attempt to read one and save it. This should
# always work because it's only a single byte.
# Then try to read the remaining length. This may fail because it is may
# be more than one byte - will need to save data pending next read if it
# does fail.
# Then try to read the remaining payload, where 'payload' here means the
# combined variable header and actual payload. This is the most likely to
# fail due to longer length, so save current data and current position.
# After all data is read, send to _mqtt_handle_packet() to deal with.
# Finally, free the memory and reset everything to starting conditions.
if self._in_packet['command'] == 0:
try:
if self._ssl:
command = self._ssl.read(1)
else:
command = self._sock.recv(1)
except socket.error as err:
if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE):
return MQTT_ERR_AGAIN
if err.errno == EAGAIN:
return MQTT_ERR_AGAIN
print(err)
return 1
else:
if len(command) == 0:
return 1
command = struct.unpack("!B", command)
self._in_packet['command'] = command[0]
if self._in_packet['have_remaining'] == 0:
# Read remaining
# Algorithm for decoding taken from pseudo code at
# http://publib.boulder.ibm.com/infocenter/wmbhelp/v6r0m0/topic/com.ibm.etools.mft.doc/ac10870_.htm
while True:
try:
if self._ssl:
byte = self._ssl.read(1)
else:
byte = self._sock.recv(1)
except socket.error as err:
if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE):
return MQTT_ERR_AGAIN
if err.errno == EAGAIN:
return MQTT_ERR_AGAIN
print(err)
return 1
else:
byte = struct.unpack("!B", byte)
byte = byte[0]
self._in_packet['remaining_count'].append(byte)
# Max 4 bytes length for remaining length as defined by protocol.
# Anything more likely means a broken/malicious client.
if len(self._in_packet['remaining_count']) > 4:
return MQTT_ERR_PROTOCOL
self._in_packet['remaining_length'] = self._in_packet['remaining_length'] + (byte & 127)*self._in_packet['remaining_mult']
self._in_packet['remaining_mult'] = self._in_packet['remaining_mult'] * 128
if (byte & 128) == 0:
break
self._in_packet['have_remaining'] = 1
self._in_packet['to_process'] = self._in_packet['remaining_length']
while self._in_packet['to_process'] > 0:
try:
if self._ssl:
data = self._ssl.read(self._in_packet['to_process'])
else:
data = self._sock.recv(self._in_packet['to_process'])
except socket.error as err:
if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE):
return MQTT_ERR_AGAIN
if err.errno == EAGAIN:
return MQTT_ERR_AGAIN
print(err)
return 1
else:
self._in_packet['to_process'] = self._in_packet['to_process'] - len(data)
self._in_packet['packet'] = self._in_packet['packet'] + data
# All data for this packet is read.
self._in_packet['pos'] = 0
rc = self._packet_handle()
# Free data and reset values
self._in_packet = dict(
command=0,
have_remaining=0,
remaining_count=[],
remaining_mult=1,
remaining_length=0,
packet=b"",
to_process=0,
pos=0)
self._msgtime_mutex.acquire()
self._last_msg_in = time.time()
self._msgtime_mutex.release()
return rc