def log_stream()

in source/sagemaker/sagemaker_graph_fraud_detection/container_build/logs.py [0:0]


def log_stream(client, log_group, stream_name, position):
    """A generator for log items in a single stream. This will yield all the
    items that are available at the current moment.

    Args:
        client (boto3.CloudWatchLogs.Client): The Boto client for CloudWatch logs.
        log_group (str): The name of the log group.
        stream_name (str): The name of the specific stream.
        position (Position): A tuple with the time stamp value to start reading the logs from and
                             The number of log entries to skip at the start. This is for when
                             there are multiple entries at the same timestamp.
        start_time (int): The time stamp value to start reading the logs from (default: 0).
        skip (int): The number of log entries to skip at the start (default: 0). This is for when there are
                    multiple entries at the same timestamp.

    Yields:
        A tuple with:
        dict: A CloudWatch log event with the following key-value pairs:
             'timestamp' (int): The time of the event.
             'message' (str): The log event data.
             'ingestionTime' (int): The time the event was ingested.
        Position: The new position
    """

    start_time, skip = position
    next_token = None

    event_count = 1
    while event_count > 0:
        if next_token is not None:
            token_arg = {"nextToken": next_token}
        else:
            token_arg = {}

        response = client.get_log_events(
            logGroupName=log_group,
            logStreamName=stream_name,
            startTime=start_time,
            startFromHead=True,
            **token_arg,
        )
        next_token = response["nextForwardToken"]
        events = response["events"]
        event_count = len(events)
        if event_count > skip:
            events = events[skip:]
            skip = 0
        else:
            skip = skip - event_count
            events = []
        for ev in events:
            ts, count = position
            if ev["timestamp"] == ts:
                position = Position(timestamp=ts, skip=count + 1)
            else:
                position = Position(timestamp=ev["timestamp"], skip=1)
            yield ev, position