in images/airflow/2.9.2/python/mwaa/celery/sqs_broker.py [0:0]
def _get_bulk(self, queue, max_if_unlimited=SQS_MAX_MESSAGES, callback=None):
"""Try to retrieve multiple messages off ``queue``.
Where :meth:`_get` returns a single Payload object, this method
returns a list of Payload objects. The number of objects returned
is determined by the total number of messages available in the queue
and the number of messages the QoS object allows (based on the
prefetch_count).
Note:
Ignores QoS limits so caller is responsible for checking
that we are allowed to consume at least one message from the
queue. get_bulk will then ask QoS for an estimate of
the number of extra messages that we can consume.
Arguments:
queue (str): The queue name to pull from.
Returns:
List[Message]
"""
# drain_events calls `can_consume` first, consuming
# a token, so we know that we are allowed to consume at least
# one message.
# Note: ignoring max_messages for SQS with boto3
max_count = self._get_message_estimate()
if max_count:
q_url = self._new_queue(queue)
# 2022-11-25: Amazon addition.
# Send a heartbeat metric each time we try to receive messages from SQS.
# I didn't notice this branch being executed as it seems that async client
# is used in our case, but adding this nevertheless to be future-proof.
Stats.incr("mwaa.celery.heartbeat", 1)
# End of Amazon addition.
resp = self.sqs(queue=queue).receive_message(
QueueUrl=q_url,
MaxNumberOfMessages=max_count,
WaitTimeSeconds=self.wait_time_seconds,
)
if resp.get("Messages"):
# 2022-11-25: Amazon addition.
# Since we pulled some messages from the SQS queue, we report
# a metric indicating the number of tasks pulled for execution.
# I didn't notice this branch being executed as it seems that
# async client is used in our case, but adding this nevertheless
# to be future-proof.
Stats.incr("mwaa.celery.task_pulled", len(resp.get("Messages")))
celery_task_tuples = []
# End of Amazon addition.
for m in resp["Messages"]:
celery_task_tuples.append(
(
self._get_task_command_from_sqs_message(m["Body"]),
m["ReceiptHandle"],
)
)
m["Body"] = AsyncMessage(body=m["Body"]).decode()
self._update_state_with_tasks(
celery_task_tuples, self.CeleryStateUpdateAction.ADD
)
for msg in self._messages_to_python(resp["Messages"], queue):
self.connection._deliver(msg, queue)
return
raise Empty()