in images/airflow/2.10.3/python/mwaa/celery/sqs_broker.py [0:0]
def _handle_sts_session(self, queue, q):
if not hasattr(self, "sts_expiration"): # STS token - token init
sts_creds = self.generate_sts_session_token(
self.transport_options.get("sts_role_arn"),
self.transport_options.get("sts_token_timeout", 900),
)
self.sts_expiration = sts_creds["Expiration"]
c = self._predefined_queue_clients[queue] = self.new_sqs_client(
region=q.get("region", self.region),
access_key_id=sts_creds["AccessKeyId"],
secret_access_key=sts_creds["SecretAccessKey"],
session_token=sts_creds["SessionToken"],
)
return c
# STS token - refresh if expired
elif self.sts_expiration.replace(tzinfo=None) < datetime.utcnow():
sts_creds = self.generate_sts_session_token(
self.transport_options.get("sts_role_arn"),
self.transport_options.get("sts_token_timeout", 900),
)
self.sts_expiration = sts_creds["Expiration"]
c = self._predefined_queue_clients[queue] = self.new_sqs_client(
region=q.get("region", self.region),
access_key_id=sts_creds["AccessKeyId"],
secret_access_key=sts_creds["SecretAccessKey"],
session_token=sts_creds["SessionToken"],
)
return c
else: # STS token - ruse existing
return self._predefined_queue_clients[queue]