in AWSIoTPythonSDK/core/protocol/paho/client.py [0:0]
def _packet_write(self):
self._current_out_packet_mutex.acquire()
while self._current_out_packet:
packet = self._current_out_packet
try:
if self._ssl:
write_length = self._ssl.write(packet['packet'][packet['pos']:])
else:
write_length = self._sock.send(packet['packet'][packet['pos']:])
except AttributeError:
self._current_out_packet_mutex.release()
return MQTT_ERR_SUCCESS
except socket.error as err:
self._current_out_packet_mutex.release()
if self._ssl and (err.errno == ssl.SSL_ERROR_WANT_READ or err.errno == ssl.SSL_ERROR_WANT_WRITE):
return MQTT_ERR_AGAIN
if err.errno == EAGAIN:
return MQTT_ERR_AGAIN
print(err)
return 1
if write_length > 0:
packet['to_process'] = packet['to_process'] - write_length
packet['pos'] = packet['pos'] + write_length
if packet['to_process'] == 0:
if (packet['command'] & 0xF0) == PUBLISH and packet['qos'] == 0:
self._callback_mutex.acquire()
if self.on_publish:
self._in_callback = True
self.on_publish(self, self._userdata, packet['mid'])
self._in_callback = False
self._callback_mutex.release()
if (packet['command'] & 0xF0) == DISCONNECT:
self._current_out_packet_mutex.release()
self._msgtime_mutex.acquire()
self._last_msg_out = time.time()
self._msgtime_mutex.release()
self._callback_mutex.acquire()
if self.on_disconnect:
self._in_callback = True
self.on_disconnect(self, self._userdata, 0)
self._in_callback = False
self._callback_mutex.release()
if self._ssl:
self._ssl.close()
self._ssl = None
if self._sock:
self._sock.close()
self._sock = None
return MQTT_ERR_SUCCESS
self._out_packet_mutex.acquire()
if len(self._out_packet) > 0:
self._current_out_packet = self._out_packet.pop(0)
else:
self._current_out_packet = None
self._out_packet_mutex.release()
else:
pass # FIXME
self._current_out_packet_mutex.release()
self._msgtime_mutex.acquire()
self._last_msg_out = time.time()
self._msgtime_mutex.release()
return MQTT_ERR_SUCCESS