def _return_abandoned_task_to_queue()

in images/airflow/2.10.3/python/mwaa/celery/task_monitor.py [0:0]


    def _return_abandoned_task_to_queue(self, celery_task: CeleryTask):
        """
        Cleanup the abandoned SQS message from the Celery SQS Channel.
        :param celery_task: Celery task (celery command + SQS receipt handle).
        """
        # For calculating behavioural metrics.
        clean_celery_message_error_no_queue = 0
        clean_celery_message_success = 0
        clean_celery_message_error_sqs_op = 0

        logger.info(
            "Cleaning up abandoned SQS message corresponding to task "
            f"state: {celery_task}"
        )
        default_queue_name = os.environ.get(DEFAULT_QUEUE_ENV_KEY)
        celery_queue_details = json.loads(
            os.environ.get(TRANSPORT_OPTIONS_ENV_KEY, "{}")
        ).get(default_queue_name)
        celery_queue_url = (
            celery_queue_details.get("url") if celery_queue_details else None
        )
        if not celery_queue_url:
            logger.info(
                f"Unable to cleanup abandoned SQS message for task state {celery_task}."
                " No default queue found."
            )
            clean_celery_message_error_no_queue += 1

        else:
            sqs: SQSClient = boto3.client(  # type: ignore
                "sqs",
                region_name=os.environ["AWS_REGION"],
                config=BOTO_RETRY_CONFIGURATION,  # type: ignore
            )
            try:
                sqs.change_message_visibility(
                    QueueUrl=celery_queue_url,
                    ReceiptHandle=celery_task["receipt_handle"],
                    VisibilityTimeout=0,
                )
                clean_celery_message_success += 1
            except botocore.exceptions.ClientError as error:  # type: ignore
                logger.info(
                    f"Unable to cleanup abandoned SQS message for task state {celery_task}."
                    f" Error: {error}"
                )
                clean_celery_message_error_sqs_op += 1
            _update_celery_state(
                self.cleanup_celery_state, celery_task, CeleryStateUpdateAction.ADD
            )

        # For calculating behavioural metrics.
        return (
            clean_celery_message_error_no_queue,
            clean_celery_message_success,
            clean_celery_message_error_sqs_op,
        )