def _handle_sts_session()

in images/airflow/2.10.3/python/mwaa/celery/sqs_broker.py [0:0]


    def _handle_sts_session(self, queue, q):
        if not hasattr(self, "sts_expiration"):  # STS token - token init
            sts_creds = self.generate_sts_session_token(
                self.transport_options.get("sts_role_arn"),
                self.transport_options.get("sts_token_timeout", 900),
            )
            self.sts_expiration = sts_creds["Expiration"]
            c = self._predefined_queue_clients[queue] = self.new_sqs_client(
                region=q.get("region", self.region),
                access_key_id=sts_creds["AccessKeyId"],
                secret_access_key=sts_creds["SecretAccessKey"],
                session_token=sts_creds["SessionToken"],
            )
            return c
        # STS token - refresh if expired
        elif self.sts_expiration.replace(tzinfo=None) < datetime.utcnow():
            sts_creds = self.generate_sts_session_token(
                self.transport_options.get("sts_role_arn"),
                self.transport_options.get("sts_token_timeout", 900),
            )
            self.sts_expiration = sts_creds["Expiration"]
            c = self._predefined_queue_clients[queue] = self.new_sqs_client(
                region=q.get("region", self.region),
                access_key_id=sts_creds["AccessKeyId"],
                secret_access_key=sts_creds["SecretAccessKey"],
                session_token=sts_creds["SessionToken"],
            )
            return c
        else:  # STS token - ruse existing
            return self._predefined_queue_clients[queue]