def on_protocol_message()

in awsiot/eventstreamrpc.py [0:0]


    def on_protocol_message(self, headers, payload, message_type, flags, **kwargs):
        try:
            logger.debug("%r received %s headers=%s", self.owner, message_type.name, headers)

            # protocol enforces that CONNECT_ACK is first msg received
            if message_type == protocol.MessageType.CONNECT_ACK:
                connect_future = None
                with self.owner._synced as synced:
                    if synced.state == _ClientState.WAITING_FOR_CONNECT_ACK:
                        if (flags & protocol.MessageFlag.CONNECTION_ACCEPTED):
                            connect_future = synced.connect_future
                            synced.connect_future = None
                            synced.state = _ClientState.CONNECTED
                        else:
                            synced.state = _ClientState.DISCONNECTING
                            synced.close_reason = AccessDeniedError(
                                "Connection access denied to event stream RPC server")
                            synced.current_connection.close()
                # complete future and invoke callback after lock is released
                if connect_future:
                    logger.info("%r connected", self.owner)
                    connect_future.set_result(None)
                    self.lifecycle_handler.on_connect()
            elif message_type == protocol.MessageType.PING_RESPONSE:
                pass
            elif message_type == protocol.MessageType.PING:
                self.lifecycle_handler.on_ping(headers, payload)
            elif message_type in (protocol.MessageType.PROTOCOL_ERROR, protocol.MessageType.INTERNAL_ERROR):
                error = EventStreamError(message_type, headers, payload)
                # If callback returns True (or forgets to return a value)
                # then close connection due to error
                return_val = self.lifecycle_handler.on_error(error)
                if return_val or return_val is None:
                    self.owner.close(error)
        except Exception as e:
            logger.error("%r closing due to exception from LifecycleHandler callback: %r", self.owner, e)
            self.owner.close(e)