in AWSIoTPythonSDK/core/protocol/paho/client.py [0:0]
def loop(self, timeout=1.0, max_packets=1):
"""Process network events.
This function must be called regularly to ensure communication with the
broker is carried out. It calls select() on the network socket to wait
for network events. If incoming data is present it will then be
processed. Outgoing commands, from e.g. publish(), are normally sent
immediately that their function is called, but this is not always
possible. loop() will also attempt to send any remaining outgoing
messages, which also includes commands that are part of the flow for
messages with QoS>0.
timeout: The time in seconds to wait for incoming/outgoing network
traffic before timing out and returning.
max_packets: Not currently used.
Returns MQTT_ERR_SUCCESS on success.
Returns >0 on error.
A ValueError will be raised if timeout < 0"""
if timeout < 0.0:
raise ValueError('Invalid timeout.')
self._current_out_packet_mutex.acquire()
self._out_packet_mutex.acquire()
if self._current_out_packet is None and len(self._out_packet) > 0:
self._current_out_packet = self._out_packet.pop(0)
if self._current_out_packet:
wlist = [self.socket()]
else:
wlist = []
self._out_packet_mutex.release()
self._current_out_packet_mutex.release()
# used to check if there are any bytes left in the ssl socket
pending_bytes = 0
if self._ssl:
pending_bytes = self.socket().pending()
# if bytes are pending do not wait in select
if pending_bytes > 0:
timeout = 0.0
# sockpairR is used to break out of select() before the timeout, on a
# call to publish() etc.
rlist = [self.socket(), self._sockpairR]
try:
socklist = select.select(rlist, wlist, [], timeout)
except TypeError as e:
# Socket isn't correct type, in likelihood connection is lost
return MQTT_ERR_CONN_LOST
except ValueError:
# Can occur if we just reconnected but rlist/wlist contain a -1 for
# some reason.
return MQTT_ERR_CONN_LOST
except:
return MQTT_ERR_UNKNOWN
if self.socket() in socklist[0] or pending_bytes > 0:
rc = self.loop_read(max_packets)
if rc or (self._ssl is None and self._sock is None):
return rc
if self._sockpairR in socklist[0]:
# Stimulate output write even though we didn't ask for it, because
# at that point the publish or other command wasn't present.
socklist[1].insert(0, self.socket())
# Clear sockpairR - only ever a single byte written.
try:
self._sockpairR.recv(1)
except socket.error as err:
if err.errno != EAGAIN:
raise
if self.socket() in socklist[1]:
rc = self.loop_write(max_packets)
if rc or (self._ssl is None and self._sock is None):
return rc
return self.loop_misc()