in CommonLayerCode/datalake-library/python/datalake_library/interfaces/sqs_interface.py [0:0]
def receive_min_max_messages(self, min_items_process, max_items_process):
"""Gets max_items_process messages from an SQS queue.
:param min_items_process: Minimum number of items to process.
:param max_items_process: Maximum number of items to process.
:return messages obtained
"""
messages = []
num_messages_queue = int(self._message_queue.attributes['ApproximateNumberOfMessages'])
# If not enough items to process, break with no messages
if (num_messages_queue==0) or (min_items_process >= num_messages_queue):
return messages
# Only pull batch sizes of max_batch_size
if num_messages_queue > max_items_process:
num_messages_queue = max_items_process
max_batch_size = 10
batch_sizes = [max_batch_size] * math.floor(num_messages_queue/max_batch_size)
if num_messages_queue % max_batch_size > 0:
batch_sizes += [num_messages_queue % max_batch_size]
for batch_size in batch_sizes:
resp_msg = self._message_queue.receive_messages(MaxNumberOfMessages=batch_size)
try:
messages.extend(message.body for message in resp_msg)
for msg in resp_msg:
msg.delete()
except KeyError:
break
return messages