in samcli/lib/observability/cw_logs/cw_log_puller.py [0:0]
def tail(self, start_time: Optional[datetime] = None, filter_pattern: Optional[str] = None):
if start_time:
self.latest_event_time = to_timestamp(start_time)
counter = self._max_retries
while counter > 0 and not self.cancelled:
LOG.debug("Tailing logs from %s starting at %s", self.cw_log_group, str(self.latest_event_time))
counter -= 1
try:
self.load_time_period(to_datetime(self.latest_event_time), filter_pattern=filter_pattern)
except ClientError as err:
error_code = err.response.get("Error", {}).get("Code")
if error_code == "ThrottlingException":
# if throttled, increase poll interval by 1 second each time
if self._poll_interval == 1:
self._poll_interval += 1
else:
self._poll_interval **= 2
LOG.warning(
"Throttled by CloudWatch Logs API, consider pulling logs for certain resources. "
"Increasing the poll interval time for resource %s to %s seconds",
self.cw_log_group,
self._poll_interval,
)
else:
# if error is other than throttling, re-raise it
LOG.error("Failed while fetching new log events", exc_info=err)
raise err
# This poll fetched logs. Reset the retry counter and set the timestamp for next poll
if self.had_data:
counter = self._max_retries
self.latest_event_time += 1 # one extra millisecond to fetch next log event
self.had_data = False
# We already fetched logs once. Sleep for some time before querying again.
# This also helps us scoot under the TPS limit for CloudWatch API call.
time.sleep(self._poll_interval)