def on_connection_setup()

in awsiot/eventstreamrpc.py [0:0]


    def on_connection_setup(self, connection, error, **kwargs):
        # if error is set, socket connection failed
        if error:
            logger.error("%r failed to establish connection: %r", self.owner, error)
            synced = self.owner._synced
            with synced.lock:
                connect_future = synced.connect_future

                synced.connect_future = None
                synced.current_handler = None
                synced.close_reason = None
                synced.closed_future.set_exception(error)
                synced.state = _ClientState.DISCONNECTED
            # complete future after lock is released
            connect_future.set_exception(error)
            return

        # error is not set, so socket connection is established.
        # next step is to send CONNECT message
        try:
            logger.debug("%r connection established, sending CONNECT message", self.owner)
            # invoke callback outside of holding the lock
            if self.owner._connect_message_amender:
                amendment = self.owner._connect_message_amender()
            else:
                amendment = MessageAmendment()

            with self.owner._synced as synced:
                synced.current_connection = connection
                # check if close() was called before connection established
                if synced.state == _ClientState.DISCONNECTING:
                    logger.debug("%r close() has been called, shutting down", self.owner)
                    connection.close()
                else:
                    headers = [Header.from_string(
                        VERSION_HEADER, VERSION_STRING)]
                    # don't allow amendment to override required headers
                    existing_names = [header.name.lower() for header in headers]
                    if amendment.headers:
                        for header in amendment.headers:
                            if header.name.lower() not in existing_names:
                                headers.append(header)

                    connection.send_protocol_message(
                        headers=headers, payload=amendment.payload,
                        message_type=protocol.MessageType.CONNECT)
                    synced.state = _ClientState.WAITING_FOR_CONNECT_ACK
        except Exception as e:
            logger.debug("%r failure attempting to send CONNECT: %r", self.owner, e)
            with self.owner._synced as synced:
                synced.state = _ClientState.DISCONNECTING
                synced.current_connection = connection
                synced.close_reason = e
                connection.close()