in source/LambdaLayers/applogger.py [0:0]
def flush(self):
"""Write the buffer to the CW Logs stream"""
# _create_log_stream will create the dated stream if it does not exist.
# It returns the name of the current stream. This way we always write to a
# date-stamped stream. Ex CIS_1-3-2020-06-02 for CIS_1-3
if self._buffer_size == 0:
return
log_stream = self._create_log_stream(log_stream=self.stream_name)
put_event_args = {
"logGroupName": self.log_group,
"logStreamName": log_stream,
"logEvents": [{"timestamp": r[0], "message": r[1]} for r in self._buffer]
}
# Send to CW Logs with retry if token has changed
while True:
try:
# add sequence token to API call parms if present
if self._stream_token:
put_event_args["sequenceToken"] = self._stream_token
resp = get_logs_connection(self.apiclient).put_log_events(**put_event_args)
self._stream_token = resp.get("nextSequenceToken", None)
break
except ClientError as ex:
exception_type = ex.response['Error']['Code']
# stream did exist but need new token, get it from exception data
if exception_type in ["InvalidSequenceTokenException", "DataAlreadyAcceptedException"]:
# update the token and retry
try:
self._stream_token = ex.response['Error']['Message'].split(":")[-1].strip()
print("Token changed. Will be retried.")
print(("Token for existing stream {} is {}".format(
self.stream_name, self._stream_token)))
except:
self._stream_token = None
raise
else:
print(("Error logstream {}, {}".format(self.stream_name, str(ex))))
break
self.clear()
self._buffer_size = 0