in AWSIoTPythonSDK/core/protocol/paho/client.py [0:0]
def _handle_connack(self):
if self._strict_protocol:
if self._in_packet['remaining_length'] != 2:
return MQTT_ERR_PROTOCOL
if len(self._in_packet['packet']) != 2:
return MQTT_ERR_PROTOCOL
(flags, result) = struct.unpack("!BB", self._in_packet['packet'])
if result == CONNACK_REFUSED_PROTOCOL_VERSION and self._protocol == MQTTv311:
self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+"), attempting downgrade to MQTT v3.1.")
# Downgrade to MQTT v3.1
self._protocol = MQTTv31
return self.reconnect()
if result == 0:
self._state = mqtt_cs_connected
self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+")")
self._callback_mutex.acquire()
if self.on_connect:
self._in_callback = True
if sys.version_info[0] < 3:
argcount = self.on_connect.func_code.co_argcount
else:
argcount = self.on_connect.__code__.co_argcount
if argcount == 3:
self.on_connect(self, self._userdata, result)
else:
flags_dict = dict()
flags_dict['session present'] = flags & 0x01
self.on_connect(self, self._userdata, flags_dict, result)
self._in_callback = False
self._callback_mutex.release()
# Start counting for stable connection
self._backoffCore.startStableConnectionTimer()
if result == 0:
rc = 0
self._out_message_mutex.acquire()
for m in self._out_messages:
m.timestamp = time.time()
if m.state == mqtt_ms_queued:
self.loop_write() # Process outgoing messages that have just been queued up
self._out_message_mutex.release()
return MQTT_ERR_SUCCESS
if m.qos == 0:
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
elif m.qos == 1:
if m.state == mqtt_ms_publish:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_puback
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
elif m.qos == 2:
if m.state == mqtt_ms_publish:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_pubrec
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
elif m.state == mqtt_ms_resend_pubrel:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_pubcomp
self._in_callback = True # Don't call loop_write after _send_pubrel()
rc = self._send_pubrel(m.mid, m.dup)
self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
self.loop_write() # Process outgoing messages that have just been queued up
self._out_message_mutex.release()
return rc
elif result > 0 and result < 6:
return MQTT_ERR_CONN_REFUSED
else:
return MQTT_ERR_PROTOCOL