in sagemaker_studio/package/package/sagemaker/logs.py [0:0]
def log_stream(client, log_group, stream_name, position):
"""A generator for log items in a single stream. This will yield all the
items that are available at the current moment.
Args:
client (boto3.CloudWatchLogs.Client): The Boto client for CloudWatch logs.
log_group (str): The name of the log group.
stream_name (str): The name of the specific stream.
position (Position): A tuple with the time stamp value to start reading the logs from and
The number of log entries to skip at the start. This is for when
there are multiple entries at the same timestamp.
start_time (int): The time stamp value to start reading the logs from (default: 0).
skip (int): The number of log entries to skip at the start (default: 0). This is for when there are
multiple entries at the same timestamp.
Yields:
A tuple with:
dict: A CloudWatch log event with the following key-value pairs:
'timestamp' (int): The time of the event.
'message' (str): The log event data.
'ingestionTime' (int): The time the event was ingested.
Position: The new position
"""
start_time, skip = position
next_token = None
event_count = 1
while event_count > 0:
if next_token is not None:
token_arg = {"nextToken": next_token}
else:
token_arg = {}
response = client.get_log_events(
logGroupName=log_group,
logStreamName=stream_name,
startTime=start_time,
startFromHead=True,
**token_arg,
)
next_token = response["nextForwardToken"]
events = response["events"]
event_count = len(events)
if event_count > skip:
events = events[skip:]
skip = 0
else:
skip = skip - event_count
events = []
for ev in events:
ts, count = position
if ev["timestamp"] == ts:
position = Position(timestamp=ts, skip=count + 1)
else:
position = Position(timestamp=ev["timestamp"], skip=1)
yield ev, position