def basic_ack()

in images/airflow/2.10.1/python/mwaa/celery/sqs_broker.py [0:0]


    def basic_ack(self, delivery_tag, multiple=False):
        try:
            message = self.qos.get(delivery_tag).delivery_info
            sqs_message = message["sqs_message"]
        except KeyError:
            super().basic_ack(delivery_tag)
        else:
            queue = None
            if "routing_key" in message:
                queue = self.canonical_queue_name(message["routing_key"])

            try:
                # 2022-11-25: Amazon addition.
                # This code is executed when a task finishes execution and
                # thus its message can be removed from the SQS queue. We thus
                # report a task_executed metric.
                Stats.incr("mwaa.celery.task_executed", 1)
                should_remove_sqs_message = True
                celery_task_tuple = (
                    self._get_task_command_from_sqs_message(sqs_message["Body"]),
                    sqs_message["ReceiptHandle"],
                )

                if self.abandoned_messages_test_enabled:
                    # In order to test for abandoned tasks, we will not remove the message from the SQS queue in 50% of the cases.
                    # This will cause the cleanup process to treat the corresponding SQS message as abandoned and the message will be
                    # returned to the queue by setting its visibility timeout to zero.
                    # Also checking that the message data should be present in the internal state (memory blocks) because the
                    # abandoned SQS message test scenario and undead Airflow process test scenario are not possible to occur
                    # simultaneously based on definition.
                    import random

                    celery_task = {
                        "command": celery_task_tuple[0],
                        "receipt_handle": celery_task_tuple[1],
                    }
                    celery_task_index = self._get_celery_task_index(
                        celery_task, self._get_tasks_from_state(self.celery_state)
                    )
                    if random.randint(0, 9) < 5 and celery_task_index != -1:
                        should_remove_sqs_message = False

                if should_remove_sqs_message:
                    self._update_state_with_tasks(
                        [celery_task_tuple], self.CeleryStateUpdateAction.REMOVE
                    )
                    self.sqs(queue=queue).delete_message(
                        QueueUrl=message["sqs_queue"],
                        ReceiptHandle=sqs_message["ReceiptHandle"],
                    )
                # End of Amazon addition.
            except ClientError:
                super().basic_reject(delivery_tag)
            else:
                super().basic_ack(delivery_tag)