def loop_over_stream()

in greengrass-v2/poll-api/artifacts/com.greengrass.GGUtils/1.0.0/GGUtils.py [0:0]


def loop_over_stream(stream_name, next_sequence_number, min_message_count=1, max_message_count=100,
                     processing_function=None):
    if processing_function is None:
        raise Exception("No processing function provided")

    try:
        next_sequence_number = validate_sequence_number(stream_name, next_sequence_number)

        message_list = read_messages_from_stream(stream_name, next_sequence_number,
                                                 min_message_count=min_message_count,
                                                 max_message_count=max_message_count)

        # Process the list of messages we received
        processing_function(message_list=message_list)

        # Persist the next sequence number so we can pick up from where we left off
        _, max_sequence_number = get_min_and_max_sequence_numbers(message_list)
        return update_sequence_number(max_sequence_number)
    except ResourceNotFoundException:
        # Try again on the next invocation
        logger.error("The stream does not exist yet. Trying again in a few seconds.")

        # Reset the sequence number
        reset_sequence_number()
    except NotEnoughMessagesException:
        logger.info("No messages waiting. Trying again in a few seconds.")
    except asyncio.TimeoutError:
        exit_on_timeout()
    except Exception as e:
        exit_on_exception(e)