def __print_log_stream()

in awsbatch-cli/src/awsbatch/awsbout.py [0:0]


    def __print_log_stream(self, log_stream, head=None, tail=None, stream=None, stream_period=None):  # noqa: C901 FIXME
        """
        Ask for log stream and print it.

        :param log_stream: job log stream
        """
        logs_client = self.boto3_factory.get_client("logs")
        try:
            # The maximum number of log events returned by the get_log_events function is as many log events
            # as can fit in a response size of 1 MB, up to 10,000 log events
            max_limit = 10000
            if head:
                limit = head
                start_from_head = True
            elif tail:
                limit = tail
                start_from_head = False
            else:
                limit = max_limit
                start_from_head = False

            response = logs_client.get_log_events(
                logGroupName="/aws/batch/job", logStreamName=log_stream, limit=limit, startFromHead=start_from_head
            )
            events = response["events"]
            self.log.debug(response)
            if not events:
                print("No events found.")

            self.__print_events(events)
            if limit == max_limit or stream:
                # get paginated items
                next_token = response["nextForwardToken"]
                while next_token is not None or stream:
                    self.log.info("Next Forward Token is (%s)" % next_token)
                    if stream:
                        period = stream_period if stream_period else 5
                        self.log.info("Waiting other %s seconds..." % period)
                        time.sleep(period)
                    response = logs_client.get_log_events(
                        logGroupName="/aws/batch/job", logStreamName=log_stream, nextToken=next_token
                    )
                    self.__print_events(response["events"])
                    # if nextForwardToken is the same we passed in, we reached the end of the stream
                    if stream:
                        next_token = response["nextForwardToken"]
                    else:
                        next_token = (
                            response["nextForwardToken"] if response["nextForwardToken"] != next_token else None
                        )
        except KeyboardInterrupt:
            self.log.info("Interrupted by the user")
            sys.exit(0)
        except Exception as e:
            fail("Error listing jobs from AWS Batch. Failed with exception: %s" % e)