in awsiot/eventstreamrpc.py [0:0]
def on_protocol_message(self, headers, payload, message_type, flags, **kwargs):
try:
logger.debug("%r received %s headers=%s", self.owner, message_type.name, headers)
# protocol enforces that CONNECT_ACK is first msg received
if message_type == protocol.MessageType.CONNECT_ACK:
connect_future = None
with self.owner._synced as synced:
if synced.state == _ClientState.WAITING_FOR_CONNECT_ACK:
if (flags & protocol.MessageFlag.CONNECTION_ACCEPTED):
connect_future = synced.connect_future
synced.connect_future = None
synced.state = _ClientState.CONNECTED
else:
synced.state = _ClientState.DISCONNECTING
synced.close_reason = AccessDeniedError(
"Connection access denied to event stream RPC server")
synced.current_connection.close()
# complete future and invoke callback after lock is released
if connect_future:
logger.info("%r connected", self.owner)
connect_future.set_result(None)
self.lifecycle_handler.on_connect()
elif message_type == protocol.MessageType.PING_RESPONSE:
pass
elif message_type == protocol.MessageType.PING:
self.lifecycle_handler.on_ping(headers, payload)
elif message_type in (protocol.MessageType.PROTOCOL_ERROR, protocol.MessageType.INTERNAL_ERROR):
error = EventStreamError(message_type, headers, payload)
# If callback returns True (or forgets to return a value)
# then close connection due to error
return_val = self.lifecycle_handler.on_error(error)
if return_val or return_val is None:
self.owner.close(error)
except Exception as e:
logger.error("%r closing due to exception from LifecycleHandler callback: %r", self.owner, e)
self.owner.close(e)