in awsiot/eventstreamrpc.py [0:0]
def on_connection_setup(self, connection, error, **kwargs):
# if error is set, socket connection failed
if error:
logger.error("%r failed to establish connection: %r", self.owner, error)
synced = self.owner._synced
with synced.lock:
connect_future = synced.connect_future
synced.connect_future = None
synced.current_handler = None
synced.close_reason = None
synced.closed_future.set_exception(error)
synced.state = _ClientState.DISCONNECTED
# complete future after lock is released
connect_future.set_exception(error)
return
# error is not set, so socket connection is established.
# next step is to send CONNECT message
try:
logger.debug("%r connection established, sending CONNECT message", self.owner)
# invoke callback outside of holding the lock
if self.owner._connect_message_amender:
amendment = self.owner._connect_message_amender()
else:
amendment = MessageAmendment()
with self.owner._synced as synced:
synced.current_connection = connection
# check if close() was called before connection established
if synced.state == _ClientState.DISCONNECTING:
logger.debug("%r close() has been called, shutting down", self.owner)
connection.close()
else:
headers = [Header.from_string(
VERSION_HEADER, VERSION_STRING)]
# don't allow amendment to override required headers
existing_names = [header.name.lower() for header in headers]
if amendment.headers:
for header in amendment.headers:
if header.name.lower() not in existing_names:
headers.append(header)
connection.send_protocol_message(
headers=headers, payload=amendment.payload,
message_type=protocol.MessageType.CONNECT)
synced.state = _ClientState.WAITING_FOR_CONNECT_ACK
except Exception as e:
logger.debug("%r failure attempting to send CONNECT: %r", self.owner, e)
with self.owner._synced as synced:
synced.state = _ClientState.DISCONNECTING
synced.current_connection = connection
synced.close_reason = e
connection.close()