in images/airflow/2.10.3/python/mwaa/celery/sqs_broker.py [0:0]
def basic_ack(self, delivery_tag, multiple=False):
try:
message = self.qos.get(delivery_tag).delivery_info
sqs_message = message["sqs_message"]
except KeyError:
super().basic_ack(delivery_tag)
else:
queue = None
if "routing_key" in message:
queue = self.canonical_queue_name(message["routing_key"])
try:
# 2022-11-25: Amazon addition.
# This code is executed when a task finishes execution and
# thus its message can be removed from the SQS queue. We thus
# report a task_executed metric.
Stats.incr("mwaa.celery.task_executed", 1)
should_remove_sqs_message = True
celery_task_tuple = (
self._get_task_command_from_sqs_message(sqs_message["Body"]),
sqs_message["ReceiptHandle"],
)
if self.abandoned_messages_test_enabled:
# In order to test for abandoned tasks, we will not remove the message from the SQS queue in 50% of the cases.
# This will cause the cleanup process to treat the corresponding SQS message as abandoned and the message will be
# returned to the queue by setting its visibility timeout to zero.
# Also checking that the message data should be present in the internal state (memory blocks) because the
# abandoned SQS message test scenario and undead Airflow process test scenario are not possible to occur
# simultaneously based on definition.
import random
celery_task = {
"command": celery_task_tuple[0],
"receipt_handle": celery_task_tuple[1],
}
celery_task_index = self._get_celery_task_index(
celery_task, self._get_tasks_from_state(self.celery_state)
)
if random.randint(0, 9) < 5 and celery_task_index != -1:
should_remove_sqs_message = False
if should_remove_sqs_message:
self._update_state_with_tasks(
[celery_task_tuple], self.CeleryStateUpdateAction.REMOVE
)
self.sqs(queue=queue).delete_message(
QueueUrl=message["sqs_queue"],
ReceiptHandle=sqs_message["ReceiptHandle"],
)
# End of Amazon addition.
except ClientError:
super().basic_reject(delivery_tag)
else:
super().basic_ack(delivery_tag)