in awsbatch-cli/src/awsbatch/awsbout.py [0:0]
def __print_log_stream(self, log_stream, head=None, tail=None, stream=None, stream_period=None): # noqa: C901 FIXME
"""
Ask for log stream and print it.
:param log_stream: job log stream
"""
logs_client = self.boto3_factory.get_client("logs")
try:
# The maximum number of log events returned by the get_log_events function is as many log events
# as can fit in a response size of 1 MB, up to 10,000 log events
max_limit = 10000
if head:
limit = head
start_from_head = True
elif tail:
limit = tail
start_from_head = False
else:
limit = max_limit
start_from_head = False
response = logs_client.get_log_events(
logGroupName="/aws/batch/job", logStreamName=log_stream, limit=limit, startFromHead=start_from_head
)
events = response["events"]
self.log.debug(response)
if not events:
print("No events found.")
self.__print_events(events)
if limit == max_limit or stream:
# get paginated items
next_token = response["nextForwardToken"]
while next_token is not None or stream:
self.log.info("Next Forward Token is (%s)" % next_token)
if stream:
period = stream_period if stream_period else 5
self.log.info("Waiting other %s seconds..." % period)
time.sleep(period)
response = logs_client.get_log_events(
logGroupName="/aws/batch/job", logStreamName=log_stream, nextToken=next_token
)
self.__print_events(response["events"])
# if nextForwardToken is the same we passed in, we reached the end of the stream
if stream:
next_token = response["nextForwardToken"]
else:
next_token = (
response["nextForwardToken"] if response["nextForwardToken"] != next_token else None
)
except KeyboardInterrupt:
self.log.info("Interrupted by the user")
sys.exit(0)
except Exception as e:
fail("Error listing jobs from AWS Batch. Failed with exception: %s" % e)