in src/asyncproc.py [0:0]
def processItems(qUrl, snsTopic, snsRole):
sqs = AwsHelper().getClient('sqs')
messages = getMessagesFromQueue(sqs, qUrl)
jc = 0
totalMessages = 0
hitLimit = False
limitException = None
if(messages):
totalMessages = len(messages)
print("Total messages: {}".format(totalMessages))
for message in messages:
receipt_handle = message['ReceiptHandle']
try:
if(hitLimit):
changeVisibility(sqs, qUrl, receipt_handle)
else:
print("starting job...")
processItem(message, snsTopic, snsRole)
print("started job...")
print('Deleting item from queue...')
# Delete received message from queue
sqs.delete_message(
QueueUrl=qUrl,
ReceiptHandle=receipt_handle
)
print('Deleted item from queue...')
jc += 1
except Exception as e:
print("Error while starting job or deleting from queue: {}".format(e))
changeVisibility(sqs, qUrl, receipt_handle)
if(e.__class__.__name__ == 'LimitExceededException'
or e.__class__.__name__ == "ProvisionedThroughputExceededException"):
hitLimit = True
limitException = e
if(hitLimit):
raise limitException
return totalMessages, jc