azurelinuxagent/ga/collect_telemetry_events.py (391 lines of code) (raw):
# Microsoft Azure Linux Agent
#
# Copyright 2020 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.6+ and Openssl 1.0+
#
import datetime
import json
import os
import re
import threading
import time
from collections import defaultdict
import azurelinuxagent.common.logger as logger
from azurelinuxagent.common import conf
from azurelinuxagent.common.agent_supported_feature import get_supported_feature_by_name, SupportedFeatureNames
from azurelinuxagent.common.event import EVENTS_DIRECTORY, TELEMETRY_LOG_EVENT_ID, \
TELEMETRY_LOG_PROVIDER_ID, add_event, WALAEventOperation, add_log_event, get_event_logger, \
CollectOrReportEventDebugInfo, EVENT_FILE_REGEX, parse_event
from azurelinuxagent.common.exception import InvalidExtensionEventError, ServiceStoppedError, EventError
from azurelinuxagent.common.future import ustr, is_file_not_found_error
from azurelinuxagent.ga.interfaces import ThreadHandlerInterface
from azurelinuxagent.common.telemetryevent import TelemetryEvent, TelemetryEventParam, \
GuestAgentGenericLogsSchema, GuestAgentExtensionEventsSchema
from azurelinuxagent.common.utils import textutil
from azurelinuxagent.ga.exthandlers import HANDLER_NAME_PATTERN
from azurelinuxagent.ga.periodic_operation import PeriodicOperation
# Event file specific retries and delays.
NUM_OF_EVENT_FILE_RETRIES = 3
EVENT_FILE_RETRY_DELAY = 1 # seconds
def get_collect_telemetry_events_handler(send_telemetry_events_handler):
return CollectTelemetryEventsHandler(send_telemetry_events_handler)
class ExtensionEventSchema(object):
"""
Class for defining the schema for Extension Events.
Sample Extension Event Example:
{
"Version":"1.0.0.23",
"Timestamp":"2018-01-02T22:08:12.510696Z" //(time in UTC (ISO-8601 standard),
"TaskName":"TestRun" //Open for publishers,
"EventLevel":"Critical/Error/Warning/Verbose/Informational/LogAlways",
"Message": "Successful test" //(max 3K, 3072 characters),
"EventPid":"1",
"EventTid":"2",
"OperationId":"Guid (str)"
}
From next version(2.10+) we accept integer values for EventPid and EventTid fields. But we still support string type for backward compatability
"""
Version = "Version"
Timestamp = "Timestamp"
TaskName = "TaskName"
EventLevel = "EventLevel"
Message = "Message"
EventPid = "EventPid"
EventTid = "EventTid"
OperationId = "OperationId"
class _ProcessExtensionEvents(PeriodicOperation):
"""
Periodic operation for collecting extension telemetry events and enqueueing them for the SendTelemetryHandler thread.
"""
_EXTENSION_EVENT_COLLECTION_PERIOD = datetime.timedelta(seconds=conf.get_etp_collection_period())
_EXTENSION_EVENT_FILE_NAME_REGEX = re.compile(r"^(\d+)\.json$", re.IGNORECASE)
# Limits
_MAX_NUMBER_OF_EVENTS_PER_EXTENSION_PER_PERIOD = 360
_EXTENSION_EVENT_FILE_MAX_SIZE = 4 * 1024 * 1024 # 4 MB = 4 * 1,048,576 Bytes
_EXTENSION_EVENT_MAX_SIZE = 1024 * 6 # 6Kb or 6144 characters. Limit for the whole event. Prevent oversized events.
_EXTENSION_EVENT_MAX_MSG_LEN = 1024 * 3 # 3Kb or 3072 chars.
_EXTENSION_EVENT_REQUIRED_FIELDS = [attr.lower() for attr in dir(ExtensionEventSchema) if
not callable(getattr(ExtensionEventSchema, attr)) and not attr.startswith("__")]
def __init__(self, send_telemetry_events_handler):
super(_ProcessExtensionEvents, self).__init__(_ProcessExtensionEvents._EXTENSION_EVENT_COLLECTION_PERIOD)
self._send_telemetry_events_handler = send_telemetry_events_handler
def _operation(self):
if self._send_telemetry_events_handler.stopped():
logger.warn("{0} service is not running, skipping current iteration".format(
self._send_telemetry_events_handler.get_thread_name()))
return
delete_all_event_files = True
extension_handler_with_event_dirs = []
try:
extension_handler_with_event_dirs = self._get_extension_events_dir_with_handler_name(conf.get_ext_log_dir())
if not extension_handler_with_event_dirs:
logger.verbose("No Extension events directory exist")
return
for extension_handler_with_event_dir in extension_handler_with_event_dirs:
handler_name = extension_handler_with_event_dir[0]
handler_event_dir_path = extension_handler_with_event_dir[1]
self._capture_extension_events(handler_name, handler_event_dir_path)
except ServiceStoppedError:
# Since the service stopped, we should not delete the extension files and retry sending them whenever
# the telemetry service comes back up
delete_all_event_files = False
except Exception as error:
msg = "Unknown error occurred when trying to collect extension events:{0}".format(
textutil.format_exception(error))
add_event(op=WALAEventOperation.ExtensionTelemetryEventProcessing, message=msg, is_success=False)
finally:
# Always ensure that the events directory are being deleted each run except when Telemetry Service is stopped,
# even if we run into an error and dont process them this run.
if delete_all_event_files:
self._ensure_all_events_directories_empty(extension_handler_with_event_dirs)
@staticmethod
def _get_extension_events_dir_with_handler_name(extension_log_dir):
"""
Get the full path to events directory for all extension handlers that have one
:param extension_log_dir: Base log directory for all extensions
:return: A list of full paths of existing events directory for all handlers
"""
extension_handler_with_event_dirs = []
for ext_handler_name in os.listdir(extension_log_dir):
# Check if its an Extension directory
if not os.path.isdir(os.path.join(extension_log_dir, ext_handler_name)) \
or re.match(HANDLER_NAME_PATTERN, ext_handler_name) is None:
continue
# Check if EVENTS_DIRECTORY directory exists
extension_event_dir = os.path.join(extension_log_dir, ext_handler_name, EVENTS_DIRECTORY)
if os.path.exists(extension_event_dir):
extension_handler_with_event_dirs.append((ext_handler_name, extension_event_dir))
return extension_handler_with_event_dirs
def _event_file_size_allowed(self, event_file_path):
event_file_size = os.stat(event_file_path).st_size
if event_file_size > self._EXTENSION_EVENT_FILE_MAX_SIZE:
convert_to_mb = lambda x: (1.0 * x) / (1000 * 1000)
msg = "Skipping file: {0} as its size is {1:.2f} Mb > Max size allowed {2:.1f} Mb".format(
event_file_path, convert_to_mb(event_file_size),
convert_to_mb(self._EXTENSION_EVENT_FILE_MAX_SIZE))
logger.warn(msg)
add_log_event(level=logger.LogLevel.WARNING, message=msg, forced=True)
return False
return True
def _capture_extension_events(self, handler_name, handler_event_dir_path):
"""
Capture Extension events and add them to the events_list
:param handler_name: Complete Handler Name. Eg: Microsoft.CPlat.Core.RunCommandLinux
:param handler_event_dir_path: Full path. Eg: '/var/log/azure/Microsoft.CPlat.Core.RunCommandLinux/events'
"""
# Filter out the files that do not follow the pre-defined EXTENSION_EVENT_FILE_NAME_REGEX
event_files = [event_file for event_file in os.listdir(handler_event_dir_path) if
re.match(self._EXTENSION_EVENT_FILE_NAME_REGEX, event_file) is not None]
# Pick the latest files first, we'll discard older events if len(events) > MAX_EVENT_COUNT
event_files.sort(reverse=True)
captured_extension_events_count = 0
dropped_events_with_error_count = defaultdict(int)
try:
for event_file in event_files:
event_file_path = os.path.join(handler_event_dir_path, event_file)
try:
logger.verbose("Processing event file: {0}", event_file_path)
if not self._event_file_size_allowed(event_file_path):
continue
# We support multiple events in a file, read the file and parse events.
captured_extension_events_count = self._enqueue_events_and_get_count(handler_name, event_file_path,
captured_extension_events_count,
dropped_events_with_error_count)
# We only allow MAX_NUMBER_OF_EVENTS_PER_EXTENSION_PER_PERIOD=300 maximum events per period per handler
if captured_extension_events_count >= self._MAX_NUMBER_OF_EVENTS_PER_EXTENSION_PER_PERIOD:
msg = "Reached max count for the extension: {0}; Max Limit: {1}. Skipping the rest.".format(
handler_name, self._MAX_NUMBER_OF_EVENTS_PER_EXTENSION_PER_PERIOD)
logger.warn(msg)
add_log_event(level=logger.LogLevel.WARNING, message=msg, forced=True)
break
except ServiceStoppedError:
# Not logging here as already logged once, re-raising
# Since we already started processing this file, deleting it as we could've already sent some events out
# This is a trade-off between data replication vs data loss.
raise
except Exception as error:
msg = "Failed to process event file {0}:{1}".format(event_file,
textutil.format_exception(error))
logger.warn(msg)
add_log_event(level=logger.LogLevel.WARNING, message=msg, forced=True)
finally:
# Todo: We should delete files after ensuring that we sent the data to Wireserver successfully
# from our end rather than deleting first and sending later. This is to ensure the data reliability
# of the agent telemetry pipeline.
os.remove(event_file_path)
finally:
if dropped_events_with_error_count:
msg = "Dropped events for Extension: {0}; Details:\n\t{1}".format(handler_name, '\n\t'.join(
["Reason: {0}; Dropped Count: {1}".format(k, v) for k, v in dropped_events_with_error_count.items()]))
logger.warn(msg)
add_log_event(level=logger.LogLevel.WARNING, message=msg, forced=True)
if captured_extension_events_count > 0:
logger.info("Collected {0} events for extension: {1}".format(captured_extension_events_count, handler_name))
@staticmethod
def _ensure_all_events_directories_empty(extension_events_directories):
if not extension_events_directories:
return
for extension_handler_with_event_dir in extension_events_directories:
event_dir_path = extension_handler_with_event_dir[1]
if not os.path.exists(event_dir_path):
return
log_err = True
# Delete any residue files in the events directory
for residue_file in os.listdir(event_dir_path):
try:
os.remove(os.path.join(event_dir_path, residue_file))
except Exception as error:
# Only log the first error once per handler per run to keep the logfile clean
if log_err:
logger.error("Failed to completely clear the {0} directory. Exception: {1}", event_dir_path,
ustr(error))
log_err = False
@staticmethod
def _read_event_file(event_file_path):
"""
Read the event file and return the data.
:param event_file_path: Full path of the event file.
:return: Event data in list or string format.
"""
# Retry for reading the event file in case file is modified while reading
# We except FileNotFoundError and ValueError to handle the case where the file is deleted or modified while reading
error_count = 0
while True:
try:
# Read event file and decode it properly
with open(event_file_path, "rb") as event_file_descriptor:
event_data = event_file_descriptor.read().decode("utf-8")
# Parse the string and get the list of events
return json.loads(event_data)
except Exception as e:
if is_file_not_found_error(e) or isinstance(e, ValueError):
error_count += 1
if error_count >= NUM_OF_EVENT_FILE_RETRIES:
raise
else:
raise
time.sleep(EVENT_FILE_RETRY_DELAY)
def _enqueue_events_and_get_count(self, handler_name, event_file_path, captured_events_count,
dropped_events_with_error_count):
event_file_time = datetime.datetime.fromtimestamp(os.path.getmtime(event_file_path))
events = self._read_event_file(event_file_path)
# We allow multiple events in a file but there can be an instance where the file only has a single
# JSON event and not a list. Handling that condition too
if not isinstance(events, list):
events = [events]
for event in events:
try:
self._send_telemetry_events_handler.enqueue_event(
self._parse_telemetry_event(handler_name, event, event_file_time)
)
captured_events_count += 1
except InvalidExtensionEventError as invalid_error:
# These are the errors thrown if there's an error parsing the event. We want to report these back to the
# extension publishers so that they are aware of the issues.
# The error messages are all static messages, we will use this to create a dict and emit an event at the
# end of each run to notify if there were any errors parsing events for the extension
dropped_events_with_error_count[ustr(invalid_error)] += 1
except ServiceStoppedError as stopped_error:
logger.error(
"Unable to enqueue events as service stopped: {0}. Stopping collecting extension events".format(
ustr(stopped_error)))
raise
except Exception as error:
logger.warn("Unable to parse and transmit event, error: {0}".format(error))
if captured_events_count >= self._MAX_NUMBER_OF_EVENTS_PER_EXTENSION_PER_PERIOD:
break
return captured_events_count
def _parse_telemetry_event(self, handler_name, extension_unparsed_event, event_file_time):
"""
Parse the Json event file and convert it to TelemetryEvent object with the required data.
:return: Complete TelemetryEvent with all required fields filled up properly. Raises if event breaches contract.
"""
extension_event = self._parse_event_and_ensure_it_is_valid(extension_unparsed_event)
# Create a telemetry event, add all common parameters to the event
# and then overwrite all the common params with extension events params if same
event = TelemetryEvent(TELEMETRY_LOG_EVENT_ID, TELEMETRY_LOG_PROVIDER_ID)
event.file_type = "json"
CollectTelemetryEventsHandler.add_common_params_to_telemetry_event(event, event_file_time)
replace_or_add_params = {
GuestAgentGenericLogsSchema.EventName: "{0}-{1}".format(handler_name, extension_event[
ExtensionEventSchema.Version.lower()]),
GuestAgentGenericLogsSchema.CapabilityUsed: extension_event[ExtensionEventSchema.EventLevel.lower()],
GuestAgentGenericLogsSchema.TaskName: extension_event[ExtensionEventSchema.TaskName.lower()],
GuestAgentGenericLogsSchema.Context1: extension_event[ExtensionEventSchema.Message.lower()],
GuestAgentGenericLogsSchema.Context2: extension_event[ExtensionEventSchema.Timestamp.lower()],
GuestAgentGenericLogsSchema.Context3: extension_event[ExtensionEventSchema.OperationId.lower()],
GuestAgentGenericLogsSchema.EventPid: extension_event[ExtensionEventSchema.EventPid.lower()],
GuestAgentGenericLogsSchema.EventTid: extension_event[ExtensionEventSchema.EventTid.lower()]
}
self._replace_or_add_param_in_event(event, replace_or_add_params)
return event
def _parse_event_and_ensure_it_is_valid(self, extension_event):
"""
Parse the Json event from file. Raise InvalidExtensionEventError if the event breaches pre-set contract.
:param extension_event: The json event from file
:return: Verified Json event that qualifies the contract.
"""
def _clean_value(k, v):
if v is not None:
if isinstance(v, int):
if k.lower() in [ExtensionEventSchema.EventPid.lower(), ExtensionEventSchema.EventTid.lower()]:
return str(v)
return v.strip()
return v
event_size = 0
key_err_msg = "{0}: {1} not found"
# Convert the dict to all lower keys to avoid schema confusion.
# Only pick the params that we care about and skip the rest.
event = dict((k.lower(), _clean_value(k, v)) for k, v in extension_event.items() if
k.lower() in self._EXTENSION_EVENT_REQUIRED_FIELDS)
# Trim message and only pick the first 3k chars
message_key = ExtensionEventSchema.Message.lower()
if message_key in event:
event[message_key] = event[message_key][:self._EXTENSION_EVENT_MAX_MSG_LEN]
else:
raise InvalidExtensionEventError(
key_err_msg.format(InvalidExtensionEventError.MissingKeyError, ExtensionEventSchema.Message))
if not event[message_key]:
raise InvalidExtensionEventError(
"{0}: {1} should not be empty".format(InvalidExtensionEventError.EmptyMessageError,
ExtensionEventSchema.Message))
for required_key in self._EXTENSION_EVENT_REQUIRED_FIELDS:
# If all required keys not in event then raise
if required_key not in event:
raise InvalidExtensionEventError(
key_err_msg.format(InvalidExtensionEventError.MissingKeyError, required_key))
# If the event_size > _EXTENSION_EVENT_MAX_SIZE=6k, then raise
if event_size > self._EXTENSION_EVENT_MAX_SIZE:
raise InvalidExtensionEventError(
"{0}: max event size allowed: {1}".format(InvalidExtensionEventError.OversizeEventError,
self._EXTENSION_EVENT_MAX_SIZE))
event_size += len(event[required_key])
return event
@staticmethod
def _replace_or_add_param_in_event(event, replace_or_add_params):
for param in event.parameters:
if param.name in replace_or_add_params:
param.value = replace_or_add_params.pop(param.name)
if not replace_or_add_params:
# All values replaced, return
return
# Add the remaining params to the event
for param_name in replace_or_add_params:
event.parameters.append(TelemetryEventParam(param_name, replace_or_add_params[param_name]))
class _CollectAndEnqueueEvents(PeriodicOperation):
"""
Periodic operation to collect telemetry events located in the events folder and enqueue them for the
SendTelemetryHandler thread.
"""
_EVENT_COLLECTION_PERIOD = datetime.timedelta(minutes=1)
def __init__(self, send_telemetry_events_handler):
super(_CollectAndEnqueueEvents, self).__init__(_CollectAndEnqueueEvents._EVENT_COLLECTION_PERIOD)
self._send_telemetry_events_handler = send_telemetry_events_handler
def _operation(self):
"""
Periodically send any events located in the events folder
"""
try:
if self._send_telemetry_events_handler.stopped():
logger.warn("{0} service is not running, skipping iteration.".format(
self._send_telemetry_events_handler.get_thread_name()))
return
self.process_events()
except Exception as error:
err_msg = "Failure in collecting telemetry events: {0}".format(ustr(error))
add_event(op=WALAEventOperation.UnhandledError, message=err_msg, is_success=False)
def process_events(self):
"""
Returns a list of events that need to be sent to the telemetry pipeline and deletes the corresponding files
from the events directory.
"""
event_directory_full_path = os.path.join(conf.get_lib_dir(), EVENTS_DIRECTORY)
event_files = os.listdir(event_directory_full_path)
debug_info = CollectOrReportEventDebugInfo(operation=CollectOrReportEventDebugInfo.OP_COLLECT)
for event_file in event_files:
try:
match = EVENT_FILE_REGEX.search(event_file)
if match is None:
continue
event_file_path = os.path.join(event_directory_full_path, event_file)
try:
logger.verbose("Processing event file: {0}", event_file_path)
event = self._read_and_parse_event_file(event_file_path)
# "legacy" events are events produced by previous versions of the agent (<= 2.2.46) and extensions;
# they do not include all the telemetry fields, so we add them here
is_legacy_event = match.group('agent_event') is None
if is_legacy_event:
# We'll use the file creation time for the event's timestamp
event_file_creation_time_epoch = os.path.getmtime(event_file_path)
event_file_creation_time = datetime.datetime.fromtimestamp(event_file_creation_time_epoch)
if event.is_extension_event():
_CollectAndEnqueueEvents._trim_legacy_extension_event_parameters(event)
CollectTelemetryEventsHandler.add_common_params_to_telemetry_event(event,
event_file_creation_time)
else:
_CollectAndEnqueueEvents._update_legacy_agent_event(event,
event_file_creation_time)
self._send_telemetry_events_handler.enqueue_event(event)
finally:
# Todo: We should delete files after ensuring that we sent the data to Wireserver successfully
# from our end rather than deleting first and sending later. This is to ensure the data reliability
# of the agent telemetry pipeline.
if os.path.exists(event_file_path):
os.remove(event_file_path)
except ServiceStoppedError as stopped_error:
logger.error(
"Unable to enqueue events as service stopped: {0}, skipping events collection".format(
ustr(stopped_error)))
except UnicodeError as uni_err:
debug_info.update_unicode_error(uni_err)
except Exception as error:
debug_info.update_op_error(error)
debug_info.report_debug_info()
@staticmethod
def _read_and_parse_event_file(event_file_path):
"""
Read the event file and parse it to a TelemetryEvent object.
:param event_file_path: Full path of the event file.
:return: TelemetryEvent object.
"""
# Retry for reading the event file in case file is modified while reading
# We except FileNotFoundError and ValueError to handle the case where the file is deleted or modified while reading
error_count = 0
while True:
try:
with open(event_file_path, "rb") as event_fd:
event_data = event_fd.read().decode("utf-8")
return parse_event(event_data)
except Exception as e:
if is_file_not_found_error(e) or isinstance(e, ValueError):
error_count += 1
if error_count >= NUM_OF_EVENT_FILE_RETRIES:
raise
else:
raise EventError("Error parsing event: {0}".format(ustr(e)))
time.sleep(EVENT_FILE_RETRY_DELAY)
@staticmethod
def _update_legacy_agent_event(event, event_creation_time):
# Ensure that if an agent event is missing a field from the schema defined since 2.2.47, the missing fields
# will be appended, ensuring the event schema is complete before the event is reported.
new_event = TelemetryEvent()
new_event.parameters = []
CollectTelemetryEventsHandler.add_common_params_to_telemetry_event(new_event, event_creation_time)
event_params = dict([(param.name, param.value) for param in event.parameters])
new_event_params = dict([(param.name, param.value) for param in new_event.parameters])
missing_params = set(new_event_params.keys()).difference(set(event_params.keys()))
params_to_add = []
for param_name in missing_params:
params_to_add.append(TelemetryEventParam(param_name, new_event_params[param_name]))
event.parameters.extend(params_to_add)
@staticmethod
def _trim_legacy_extension_event_parameters(event):
"""
This method is called for extension events before they are sent out. Per the agreement with extension
publishers, the parameters that belong to extensions and will be reported intact are Name, Version, Operation,
OperationSuccess, Message, and Duration. Since there is nothing preventing extensions to instantiate other
fields (which belong to the agent), we call this method to ensure the rest of the parameters are trimmed since
they will be replaced with values coming from the agent.
:param event: Extension event to trim.
:return: Trimmed extension event; containing only extension-specific parameters.
"""
params_to_keep = dict.fromkeys([
GuestAgentExtensionEventsSchema.Name,
GuestAgentExtensionEventsSchema.Version,
GuestAgentExtensionEventsSchema.Operation,
GuestAgentExtensionEventsSchema.OperationSuccess,
GuestAgentExtensionEventsSchema.Message,
GuestAgentExtensionEventsSchema.Duration
])
trimmed_params = []
for param in event.parameters:
if param.name in params_to_keep:
trimmed_params.append(param)
event.parameters = trimmed_params
class CollectTelemetryEventsHandler(ThreadHandlerInterface):
"""
This Handler takes care of fetching the Extension Telemetry events from the {extension_events_dir} and sends it to
Kusto for advanced debuggability.
"""
_THREAD_NAME = "TelemetryEventsCollector"
def __init__(self, send_telemetry_events_handler):
self.should_run = True
self.thread = None
self._send_telemetry_events_handler = send_telemetry_events_handler
@staticmethod
def get_thread_name():
return CollectTelemetryEventsHandler._THREAD_NAME
def run(self):
logger.info("Start Extension Telemetry service.")
self.start()
def is_alive(self):
return self.thread is not None and self.thread.is_alive()
def start(self):
self.thread = threading.Thread(target=self.daemon)
self.thread.daemon = True
self.thread.name = CollectTelemetryEventsHandler.get_thread_name()
self.thread.start()
def stop(self):
"""
Stop server communication and join the thread to main thread.
"""
self.should_run = False
if self.is_alive():
self.thread.join()
def stopped(self):
return not self.should_run
def daemon(self):
periodic_operations = [
_CollectAndEnqueueEvents(self._send_telemetry_events_handler)
]
is_etp_enabled = get_supported_feature_by_name(SupportedFeatureNames.ExtensionTelemetryPipeline).is_supported
logger.info("Extension Telemetry pipeline enabled: {0}".format(is_etp_enabled))
if is_etp_enabled:
periodic_operations.append(_ProcessExtensionEvents(self._send_telemetry_events_handler))
logger.info("Successfully started the {0} thread".format(self.get_thread_name()))
while not self.stopped():
try:
for periodic_op in periodic_operations:
periodic_op.run()
except Exception as error:
logger.warn(
"An error occurred in the Telemetry Extension thread main loop; will skip the current iteration.\n{0}",
ustr(error))
finally:
PeriodicOperation.sleep_until_next_operation(periodic_operations)
@staticmethod
def add_common_params_to_telemetry_event(event, event_time):
reporter = get_event_logger()
reporter.add_common_event_parameters(event, event_time)