in images/airflow/2.10.1/python/mwaa/celery/task_monitor.py [0:0]
def _return_abandoned_task_to_queue(self, celery_task: CeleryTask):
"""
Cleanup the abandoned SQS message from the Celery SQS Channel.
:param celery_task: Celery task (celery command + SQS receipt handle).
"""
# For calculating behavioural metrics.
clean_celery_message_error_no_queue = 0
clean_celery_message_success = 0
clean_celery_message_error_sqs_op = 0
logger.info(
"Cleaning up abandoned SQS message corresponding to task "
f"state: {celery_task}"
)
default_queue_name = os.environ.get(DEFAULT_QUEUE_ENV_KEY)
celery_queue_details = json.loads(
os.environ.get(TRANSPORT_OPTIONS_ENV_KEY, "{}")
).get(default_queue_name)
celery_queue_url = (
celery_queue_details.get("url") if celery_queue_details else None
)
if not celery_queue_url:
logger.info(
f"Unable to cleanup abandoned SQS message for task state {celery_task}."
" No default queue found."
)
clean_celery_message_error_no_queue += 1
else:
sqs: SQSClient = boto3.client( # type: ignore
"sqs",
region_name=os.environ["AWS_REGION"],
config=BOTO_RETRY_CONFIGURATION, # type: ignore
)
try:
sqs.change_message_visibility(
QueueUrl=celery_queue_url,
ReceiptHandle=celery_task["receipt_handle"],
VisibilityTimeout=0,
)
clean_celery_message_success += 1
except botocore.exceptions.ClientError as error: # type: ignore
logger.info(
f"Unable to cleanup abandoned SQS message for task state {celery_task}."
f" Error: {error}"
)
clean_celery_message_error_sqs_op += 1
_update_celery_state(
self.cleanup_celery_state, celery_task, CeleryStateUpdateAction.ADD
)
# For calculating behavioural metrics.
return (
clean_celery_message_error_no_queue,
clean_celery_message_success,
clean_celery_message_error_sqs_op,
)