def _capture_extension_events()

in azurelinuxagent/ga/collect_telemetry_events.py [0:0]


    def _capture_extension_events(self, handler_name, handler_event_dir_path):
        """
        Capture Extension events and add them to the events_list
        :param handler_name: Complete Handler Name. Eg: Microsoft.CPlat.Core.RunCommandLinux
        :param handler_event_dir_path: Full path. Eg: '/var/log/azure/Microsoft.CPlat.Core.RunCommandLinux/events'
        """

        # Filter out the files that do not follow the pre-defined EXTENSION_EVENT_FILE_NAME_REGEX
        event_files = [event_file for event_file in os.listdir(handler_event_dir_path) if
                       re.match(self._EXTENSION_EVENT_FILE_NAME_REGEX, event_file) is not None]
        # Pick the latest files first, we'll discard older events if len(events) > MAX_EVENT_COUNT
        event_files.sort(reverse=True)

        captured_extension_events_count = 0
        dropped_events_with_error_count = defaultdict(int)

        try:
            for event_file in event_files:

                event_file_path = os.path.join(handler_event_dir_path, event_file)
                try:
                    logger.verbose("Processing event file: {0}", event_file_path)

                    if not self._event_file_size_allowed(event_file_path):
                        continue

                    # We support multiple events in a file, read the file and parse events.
                    captured_extension_events_count = self._enqueue_events_and_get_count(handler_name, event_file_path,
                                                                                         captured_extension_events_count,
                                                                                         dropped_events_with_error_count)

                    # We only allow MAX_NUMBER_OF_EVENTS_PER_EXTENSION_PER_PERIOD=300 maximum events per period per handler
                    if captured_extension_events_count >= self._MAX_NUMBER_OF_EVENTS_PER_EXTENSION_PER_PERIOD:
                        msg = "Reached max count for the extension: {0}; Max Limit: {1}. Skipping the rest.".format(
                            handler_name, self._MAX_NUMBER_OF_EVENTS_PER_EXTENSION_PER_PERIOD)
                        logger.warn(msg)
                        add_log_event(level=logger.LogLevel.WARNING, message=msg, forced=True)
                        break
                except ServiceStoppedError:
                    # Not logging here as already logged once, re-raising
                    # Since we already started processing this file, deleting it as we could've already sent some events out
                    # This is a trade-off between data replication vs data loss.
                    raise
                except Exception as error:
                    msg = "Failed to process event file {0}:{1}".format(event_file,
                                                                              textutil.format_exception(error))
                    logger.warn(msg)
                    add_log_event(level=logger.LogLevel.WARNING, message=msg, forced=True)
                finally:
                    # Todo: We should delete files after ensuring that we sent the data to Wireserver successfully
                    # from our end rather than deleting first and sending later. This is to ensure the data reliability
                    # of the agent telemetry pipeline.
                    os.remove(event_file_path)

        finally:
            if dropped_events_with_error_count:
                msg = "Dropped events for Extension: {0}; Details:\n\t{1}".format(handler_name, '\n\t'.join(
                    ["Reason: {0}; Dropped Count: {1}".format(k, v) for k, v in dropped_events_with_error_count.items()]))
                logger.warn(msg)
                add_log_event(level=logger.LogLevel.WARNING, message=msg, forced=True)

            if captured_extension_events_count > 0:
                logger.info("Collected {0} events for extension: {1}".format(captured_extension_events_count, handler_name))