in azurelinuxagent/ga/collect_telemetry_events.py [0:0]
def _capture_extension_events(self, handler_name, handler_event_dir_path):
"""
Capture Extension events and add them to the events_list
:param handler_name: Complete Handler Name. Eg: Microsoft.CPlat.Core.RunCommandLinux
:param handler_event_dir_path: Full path. Eg: '/var/log/azure/Microsoft.CPlat.Core.RunCommandLinux/events'
"""
# Filter out the files that do not follow the pre-defined EXTENSION_EVENT_FILE_NAME_REGEX
event_files = [event_file for event_file in os.listdir(handler_event_dir_path) if
re.match(self._EXTENSION_EVENT_FILE_NAME_REGEX, event_file) is not None]
# Pick the latest files first, we'll discard older events if len(events) > MAX_EVENT_COUNT
event_files.sort(reverse=True)
captured_extension_events_count = 0
dropped_events_with_error_count = defaultdict(int)
try:
for event_file in event_files:
event_file_path = os.path.join(handler_event_dir_path, event_file)
try:
logger.verbose("Processing event file: {0}", event_file_path)
if not self._event_file_size_allowed(event_file_path):
continue
# We support multiple events in a file, read the file and parse events.
captured_extension_events_count = self._enqueue_events_and_get_count(handler_name, event_file_path,
captured_extension_events_count,
dropped_events_with_error_count)
# We only allow MAX_NUMBER_OF_EVENTS_PER_EXTENSION_PER_PERIOD=300 maximum events per period per handler
if captured_extension_events_count >= self._MAX_NUMBER_OF_EVENTS_PER_EXTENSION_PER_PERIOD:
msg = "Reached max count for the extension: {0}; Max Limit: {1}. Skipping the rest.".format(
handler_name, self._MAX_NUMBER_OF_EVENTS_PER_EXTENSION_PER_PERIOD)
logger.warn(msg)
add_log_event(level=logger.LogLevel.WARNING, message=msg, forced=True)
break
except ServiceStoppedError:
# Not logging here as already logged once, re-raising
# Since we already started processing this file, deleting it as we could've already sent some events out
# This is a trade-off between data replication vs data loss.
raise
except Exception as error:
msg = "Failed to process event file {0}:{1}".format(event_file,
textutil.format_exception(error))
logger.warn(msg)
add_log_event(level=logger.LogLevel.WARNING, message=msg, forced=True)
finally:
# Todo: We should delete files after ensuring that we sent the data to Wireserver successfully
# from our end rather than deleting first and sending later. This is to ensure the data reliability
# of the agent telemetry pipeline.
os.remove(event_file_path)
finally:
if dropped_events_with_error_count:
msg = "Dropped events for Extension: {0}; Details:\n\t{1}".format(handler_name, '\n\t'.join(
["Reason: {0}; Dropped Count: {1}".format(k, v) for k, v in dropped_events_with_error_count.items()]))
logger.warn(msg)
add_log_event(level=logger.LogLevel.WARNING, message=msg, forced=True)
if captured_extension_events_count > 0:
logger.info("Collected {0} events for extension: {1}".format(captured_extension_events_count, handler_name))