in greengrass-v2/poll-api/artifacts/com.greengrass.GGUtils/1.0.0/GGUtils.py [0:0]
def loop_over_stream(stream_name, next_sequence_number, min_message_count=1, max_message_count=100,
processing_function=None):
if processing_function is None:
raise Exception("No processing function provided")
try:
next_sequence_number = validate_sequence_number(stream_name, next_sequence_number)
message_list = read_messages_from_stream(stream_name, next_sequence_number,
min_message_count=min_message_count,
max_message_count=max_message_count)
# Process the list of messages we received
processing_function(message_list=message_list)
# Persist the next sequence number so we can pick up from where we left off
_, max_sequence_number = get_min_and_max_sequence_numbers(message_list)
return update_sequence_number(max_sequence_number)
except ResourceNotFoundException:
# Try again on the next invocation
logger.error("The stream does not exist yet. Trying again in a few seconds.")
# Reset the sequence number
reset_sequence_number()
except NotEnoughMessagesException:
logger.info("No messages waiting. Trying again in a few seconds.")
except asyncio.TimeoutError:
exit_on_timeout()
except Exception as e:
exit_on_exception(e)