in source/lambda/util/logger.py [0:0]
def flush(self):
"""
Writes all buffered messages to CloudWatch Stream
:return:
"""
def get_next_log_token():
if self._log_sequence_token:
return self._log_sequence_token
resp = self.client.describe_log_streams_with_retries(logGroupName=self._loggroup, logStreamNamePrefix=self._logstream)
if "logStreams" in resp and len(resp["logStreams"]) > 0:
token = resp["logStreams"][0].get("uploadSequenceToken")
return token
try:
self.client.create_log_stream_with_retries(logGroupName=self._loggroup, logStreamName=self._logstream)
resp = self.client.describe_log_streams_with_retries(logGroupName=self._loggroup,
logStreamNamePrefix=self._logstream)
except Exception as e:
if type(e).__name__ != "ResourceAlreadyExistsException":
raise e
retry_get_token_count = 0
while True:
if "logStreams" in resp and len(resp["logStreams"]) > 0:
token = resp["logStreams"][0].get("uploadSequenceToken")
return token
else:
retry_get_token_count += 1
if retry_get_token_count > 3:
return None
time.sleep(1)
if len(self._buffer) == 0:
return
put_event_args = {
"logGroupName": self._loggroup,
"logStreamName": self._logstream,
"logEvents": [{"timestamp": r[0], "message": r[1]} for r in self._buffer]
}
next_token = None
try:
retries = 0
while True:
next_token = get_next_log_token()
if next_token is not None:
put_event_args["sequenceToken"] = next_token
try:
log_event_response = self.client.put_log_events(**put_event_args)
self._log_sequence_token = log_event_response['nextSequenceToken']
self._buffer = []
self._cached_size = 0
return
except Exception as ex:
retries += 1
if retries > 5:
raise ex
time.sleep(2)
next_token = get_next_log_token()
if next_token is not None:
put_event_args["sequenceToken"] = next_token
except Exception as ex:
print("Error writing to logstream {} with token {} ({})".format(self._logstream, next_token, str(ex)))
for entry in self._buffer:
print (entry)