in src/braket/jobs/logs.py [0:0]
def log_stream(aws_session, log_group, stream_name, start_time=0, skip=0):
"""A generator for log items in a single stream.
This yields all the items that are available at the current moment.
Args:
aws_session (AwsSession): The AwsSession for interfacing with CloudWatch.
log_group (str): The name of the log group.
stream_name (str): The name of the specific stream.
start_time (int): The time stamp value to start reading the logs from. Default: 0.
skip (int): The number of log entries to skip at the start. Default: 0 (This is for
when there are multiple entries at the same timestamp.)
Yields:
Dict: A CloudWatch log event with the following key-value pairs:
'timestamp' (int): The time of the event.
'message' (str): The log event data.
'ingestionTime' (int): The time the event was ingested.
"""
next_token = None
event_count = 1
while event_count > 0:
response = aws_session.get_log_events(
log_group,
stream_name,
start_time,
start_from_head=True,
next_token=next_token,
)
next_token = response["nextForwardToken"]
events = response["events"]
event_count = len(events)
if event_count > skip:
events = events[skip:]
skip = 0
else:
skip = skip - event_count
events = []
for ev in events:
yield ev