def tail()

in samcli/lib/observability/cw_logs/cw_log_puller.py [0:0]


    def tail(self, start_time: Optional[datetime] = None, filter_pattern: Optional[str] = None):
        if start_time:
            self.latest_event_time = to_timestamp(start_time)

        counter = self._max_retries
        while counter > 0 and not self.cancelled:
            LOG.debug("Tailing logs from %s starting at %s", self.cw_log_group, str(self.latest_event_time))

            counter -= 1
            try:
                self.load_time_period(to_datetime(self.latest_event_time), filter_pattern=filter_pattern)
            except ClientError as err:
                error_code = err.response.get("Error", {}).get("Code")
                if error_code == "ThrottlingException":
                    # if throttled, increase poll interval by 1 second each time
                    if self._poll_interval == 1:
                        self._poll_interval += 1
                    else:
                        self._poll_interval **= 2
                    LOG.warning(
                        "Throttled by CloudWatch Logs API, consider pulling logs for certain resources. "
                        "Increasing the poll interval time for resource %s to %s seconds",
                        self.cw_log_group,
                        self._poll_interval,
                    )
                else:
                    # if error is other than throttling, re-raise it
                    LOG.error("Failed while fetching new log events", exc_info=err)
                    raise err

            # This poll fetched logs. Reset the retry counter and set the timestamp for next poll
            if self.had_data:
                counter = self._max_retries
                self.latest_event_time += 1  # one extra millisecond to fetch next log event
                self.had_data = False

            # We already fetched logs once. Sleep for some time before querying again.
            # This also helps us scoot under the TPS limit for CloudWatch API call.
            time.sleep(self._poll_interval)