azurelinuxagent/common/event.py (517 lines of code) (raw):
# Copyright 2018 Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Requires Python 2.6+ and Openssl 1.0+
#
import atexit
import json
import os
import platform
import re
import sys
import threading
import time
import traceback
from datetime import datetime
import azurelinuxagent.common.conf as conf
import azurelinuxagent.common.logger as logger
from azurelinuxagent.common.AgentGlobals import AgentGlobals
from azurelinuxagent.common.exception import EventError, OSUtilError
from azurelinuxagent.common.future import ustr
from azurelinuxagent.common.datacontract import get_properties, set_properties
from azurelinuxagent.common.osutil import get_osutil
from azurelinuxagent.common.telemetryevent import TelemetryEventParam, TelemetryEvent, CommonTelemetryEventSchema, \
GuestAgentGenericLogsSchema, GuestAgentExtensionEventsSchema, GuestAgentPerfCounterEventsSchema
from azurelinuxagent.common.utils import fileutil, textutil
from azurelinuxagent.common.utils.textutil import parse_doc, findall, find, getattrib, str_to_encoded_ustr
from azurelinuxagent.common.version import CURRENT_VERSION, CURRENT_AGENT, AGENT_NAME, DISTRO_NAME, DISTRO_VERSION, DISTRO_CODE_NAME, AGENT_EXECUTION_MODE
from azurelinuxagent.common.protocol.imds import get_imds_client
EVENTS_DIRECTORY = "events"
_EVENT_MSG = "Event: name={0}, op={1}, message={2}, duration={3}"
TELEMETRY_EVENT_PROVIDER_ID = "69B669B9-4AF8-4C50-BDC4-6006FA76E975"
TELEMETRY_EVENT_EVENT_ID = 1
TELEMETRY_METRICS_EVENT_ID = 4
TELEMETRY_LOG_PROVIDER_ID = "FFF0196F-EE4C-4EAF-9AA5-776F622DEB4F"
TELEMETRY_LOG_EVENT_ID = 7
#
# When this flag is enabled the TODO comment in Logger.log() needs to be addressed; also the tests
# marked with "Enable this test when SEND_LOGS_TO_TELEMETRY is enabled" should be enabled.
#
SEND_LOGS_TO_TELEMETRY = False
MAX_NUMBER_OF_EVENTS = 1000
AGENT_EVENT_FILE_EXTENSION = '.waagent.tld'
EVENT_FILE_REGEX = re.compile(r'(?P<agent_event>\.waagent)?\.tld$')
def send_logs_to_telemetry():
return SEND_LOGS_TO_TELEMETRY
class WALAEventOperation:
ActivateResourceDisk = "ActivateResourceDisk"
AgentBlacklisted = "AgentBlacklisted"
AgentEnabled = "AgentEnabled"
AgentMemory = "AgentMemory"
AgentUpgrade = "AgentUpgrade"
ArtifactsProfileBlob = "ArtifactsProfileBlob"
CGroupsCleanUp = "CGroupsCleanUp"
CGroupsDisabled = "CGroupsDisabled"
CGroupsInfo = "CGroupsInfo"
CloudInit = "CloudInit"
CollectEventErrors = "CollectEventErrors"
CollectEventUnicodeErrors = "CollectEventUnicodeErrors"
ConfigurationChange = "ConfigurationChange"
CustomData = "CustomData"
DefaultChannelChange = "DefaultChannelChange"
Deploy = "Deploy"
Disable = "Disable"
Downgrade = "Downgrade"
Download = "Download"
Enable = "Enable"
ExtensionHandlerManifest = "ExtensionHandlerManifest"
ExtensionPolicy = "ExtensionPolicy"
ExtensionProcessing = "ExtensionProcessing"
ExtensionTelemetryEventProcessing = "ExtensionTelemetryEventProcessing"
FetchGoalState = "FetchGoalState"
Firewall = "Firewall"
GoalState = "GoalState"
GoalStateUnsupportedFeatures = "GoalStateUnsupportedFeatures"
HealthCheck = "HealthCheck"
HealthObservation = "HealthObservation"
HeartBeat = "HeartBeat"
HostnamePublishing = "HostnamePublishing"
HostPlugin = "HostPlugin"
HostPluginHeartbeat = "HostPluginHeartbeat"
HostPluginHeartbeatExtended = "HostPluginHeartbeatExtended"
HttpErrors = "HttpErrors"
HttpGet = "HttpGet"
ImdsHeartbeat = "ImdsHeartbeat"
Install = "Install"
InitializeHostPlugin = "InitializeHostPlugin"
Log = "Log"
LogCollection = "LogCollection"
NoExec = "NoExec"
OSInfo = "OSInfo"
OpenSsl = "OpenSsl"
PersistFirewallRules = "PersistFirewallRules"
Policy = "Policy"
ProvisionAfterExtensions = "ProvisionAfterExtensions"
PluginSettingsVersionMismatch = "PluginSettingsVersionMismatch"
InvalidExtensionConfig = "InvalidExtensionConfig"
Provision = "Provision"
ProvisionGuestAgent = "ProvisionGuestAgent"
RemoteAccessHandling = "RemoteAccessHandling"
ReportEventErrors = "ReportEventErrors"
ReportEventUnicodeErrors = "ReportEventUnicodeErrors"
ReportStatus = "ReportStatus"
ReportStatusExtended = "ReportStatusExtended"
RequestedStateDisabled = "RequestedStateDisabled"
RequestedVersionMismatch = "RequestedVersionMismatch"
ResetFirewall = "ResetFirewall"
Restart = "Restart"
SetCGroupsLimits = "SetCGroupsLimits"
SkipUpdate = "SkipUpdate"
StatusProcessing = "StatusProcessing"
UnhandledError = "UnhandledError"
UnInstall = "UnInstall"
Unknown = "Unknown"
Update = "Update"
VmSettings = "VmSettings"
VmSettingsSummary = "VmSettingsSummary"
SHOULD_ENCODE_MESSAGE_LEN = 80
SHOULD_ENCODE_MESSAGE_OP = [
WALAEventOperation.Disable,
WALAEventOperation.Enable,
WALAEventOperation.Install,
WALAEventOperation.UnInstall,
]
class EventStatus(object):
EVENT_STATUS_FILE = "event_status.json"
def __init__(self):
self._path = None
self._status = {}
def clear(self):
self._status = {}
self._save()
def event_marked(self, name, version, op):
return self._event_name(name, version, op) in self._status
def event_succeeded(self, name, version, op):
event = self._event_name(name, version, op)
if event not in self._status:
return True
return self._status[event] is True
def initialize(self, status_dir=conf.get_lib_dir()):
self._path = os.path.join(status_dir, EventStatus.EVENT_STATUS_FILE)
self._load()
def mark_event_status(self, name, version, op, status):
event = self._event_name(name, version, op)
self._status[event] = (status is True)
self._save()
def _event_name(self, name, version, op):
return "{0}-{1}-{2}".format(name, version, op)
def _load(self):
try:
self._status = {}
if os.path.isfile(self._path):
with open(self._path, 'r') as f:
self._status = json.load(f)
except Exception as e:
logger.warn("Exception occurred loading event status: {0}".format(e))
self._status = {}
def _save(self):
try:
with open(self._path, 'w') as f:
json.dump(self._status, f)
except Exception as e:
logger.warn("Exception occurred saving event status: {0}".format(e))
__event_status__ = EventStatus()
__event_status_operations__ = [
WALAEventOperation.ReportStatus
]
def parse_json_event(data_str):
data = json.loads(data_str)
event = TelemetryEvent()
set_properties("TelemetryEvent", event, data)
event.file_type = "json"
return event
def parse_event(data_str):
try:
return parse_json_event(data_str)
except ValueError:
return parse_xml_event(data_str)
def parse_xml_param(param_node):
name = getattrib(param_node, "Name")
value_str = getattrib(param_node, "Value")
attr_type = getattrib(param_node, "T")
value = value_str
if attr_type == 'mt:uint64':
value = int(value_str)
elif attr_type == 'mt:bool':
value = bool(value_str)
elif attr_type == 'mt:float64':
value = float(value_str)
return TelemetryEventParam(name, value)
def parse_xml_event(data_str):
try:
xml_doc = parse_doc(data_str)
event_id = getattrib(find(xml_doc, "Event"), 'id')
provider_id = getattrib(find(xml_doc, "Provider"), 'id')
event = TelemetryEvent(event_id, provider_id)
param_nodes = findall(xml_doc, 'Param')
for param_node in param_nodes:
event.parameters.append(parse_xml_param(param_node))
event.file_type = "xml"
return event
except Exception as e:
raise ValueError(ustr(e))
def _encode_message(op, message):
"""
Gzip and base64 encode a message based on the operation.
The intent of this message is to make the logs human readable and include the
stdout/stderr from extension operations. Extension operations tend to generate
a lot of noise, which makes it difficult to parse the line-oriented waagent.log.
The compromise is to encode the stdout/stderr so we preserve the data and do
not destroy the line oriented nature.
The data can be recovered using the following command:
$ echo '<encoded data>' | base64 -d | pigz -zd
You may need to install the pigz command.
:param op: Operation, e.g. Enable or Install
:param message: Message to encode
:return: gzip'ed and base64 encoded message, or the original message
"""
if len(message) == 0:
return message
if op not in SHOULD_ENCODE_MESSAGE_OP:
return message
try:
return textutil.compress(message)
except Exception:
# If the message could not be encoded a dummy message ('<>') is returned.
# The original message was still sent via telemetry, so all is not lost.
return "<>"
def _log_event(name, op, message, duration, is_success=True):
global _EVENT_MSG # pylint: disable=W0602, W0603
if not is_success:
logger.error(_EVENT_MSG, name, op, message, duration)
else:
logger.info(_EVENT_MSG, name, op, message, duration)
class CollectOrReportEventDebugInfo(object):
"""
This class is used for capturing and reporting debug info that is captured during event collection and
reporting to wireserver.
It captures the count of unicode errors and any unexpected errors and also a subset of errors with stacks to help
with debugging any potential issues.
"""
__MAX_ERRORS_TO_REPORT = 5
OP_REPORT = "Report"
OP_COLLECT = "Collect"
def __init__(self, operation=OP_REPORT):
self.__unicode_error_count = 0
self.__unicode_errors = set()
self.__op_error_count = 0
self.__op_errors = set()
if operation == self.OP_REPORT:
self.__unicode_error_event = WALAEventOperation.ReportEventUnicodeErrors
self.__op_errors_event = WALAEventOperation.ReportEventErrors
elif operation == self.OP_COLLECT:
self.__unicode_error_event = WALAEventOperation.CollectEventUnicodeErrors
self.__op_errors_event = WALAEventOperation.CollectEventErrors
def report_debug_info(self):
def report_dropped_events_error(count, errors, operation_name):
err_msg_format = "DroppedEventsCount: {0}\nReasons (first {1} errors): {2}"
if count > 0:
add_event(op=operation_name,
message=err_msg_format.format(count, CollectOrReportEventDebugInfo.__MAX_ERRORS_TO_REPORT, ', '.join(errors)),
is_success=False)
report_dropped_events_error(self.__op_error_count, self.__op_errors, self.__op_errors_event)
report_dropped_events_error(self.__unicode_error_count, self.__unicode_errors, self.__unicode_error_event)
@staticmethod
def _update_errors_and_get_count(error_count, errors, error_msg):
error_count += 1
if len(errors) < CollectOrReportEventDebugInfo.__MAX_ERRORS_TO_REPORT:
errors.add("{0}: {1}".format(ustr(error_msg), traceback.format_exc()))
return error_count
def update_unicode_error(self, unicode_err):
self.__unicode_error_count = self._update_errors_and_get_count(self.__unicode_error_count, self.__unicode_errors,
unicode_err)
def update_op_error(self, op_err):
self.__op_error_count = self._update_errors_and_get_count(self.__op_error_count, self.__op_errors, op_err)
def get_error_count(self):
return self.__op_error_count + self.__unicode_error_count
class EventLogger(object):
def __init__(self):
self.event_dir = None
self.periodic_events = {}
self.protocol = None
#
# All events should have these parameters.
#
# The first set comes from the current OS and is initialized here. These values don't change during
# the agent's lifetime.
#
# The next two sets come from the goal state and IMDS and must be explicitly initialized using
# initialize_vminfo_common_parameters() once a protocol for communication with the host has been
# created. Their values don't change during the agent's lifetime. Note that we initialize these
# parameters here using dummy values (*_UNINITIALIZED) since events sent to the host should always
# match the schema defined for them in the telemetry pipeline.
#
# There is another set of common parameters that must be computed at the time the event is created
# (e.g. the timestamp and the container ID); those are added to events (along with the parameters
# below) in _add_common_event_parameters()
#
# Note that different kinds of events may also include other parameters; those are added by the
# corresponding add_* method (e.g. add_metric for performance metrics).
#
self._common_parameters = []
# Parameters from OS
osutil = get_osutil()
keyword_name = {
"CpuArchitecture": osutil.get_vm_arch()
}
self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.OSVersion, EventLogger._get_os_version()))
self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.ExecutionMode, AGENT_EXECUTION_MODE))
self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.RAM, int(EventLogger._get_ram(osutil))))
self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.Processors, int(EventLogger._get_processors(osutil))))
self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.KeywordName, json.dumps(keyword_name)))
# Parameters from goal state
self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.TenantName, "TenantName_UNINITIALIZED"))
self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.RoleName, "RoleName_UNINITIALIZED"))
self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.RoleInstanceName, "RoleInstanceName_UNINITIALIZED"))
#
# # Parameters from IMDS
self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.Location, "Location_UNINITIALIZED"))
self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.SubscriptionId, "SubscriptionId_UNINITIALIZED"))
self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.ResourceGroupName, "ResourceGroupName_UNINITIALIZED"))
self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.VMId, "VMId_UNINITIALIZED"))
self._common_parameters.append(TelemetryEventParam(CommonTelemetryEventSchema.ImageOrigin, 0))
@staticmethod
def _get_os_version():
return "{0}:{1}-{2}-{3}:{4}".format(platform.system(), DISTRO_NAME, DISTRO_VERSION, DISTRO_CODE_NAME, platform.release())
@staticmethod
def _get_ram(osutil):
try:
return osutil.get_total_mem()
except OSUtilError as e:
logger.warn("Failed to get RAM info; will be missing from telemetry: {0}", ustr(e))
return 0
@staticmethod
def _get_processors(osutil):
try:
return osutil.get_processor_cores()
except OSUtilError as e:
logger.warn("Failed to get Processors info; will be missing from telemetry: {0}", ustr(e))
return 0
def initialize_vminfo_common_parameters(self, protocol):
"""
Initializes the common parameters that come from the goal state and IMDS
"""
# create an index of the event parameters for faster updates
parameters = {}
for p in self._common_parameters:
parameters[p.name] = p
try:
vminfo = protocol.get_vminfo()
parameters[CommonTelemetryEventSchema.TenantName].value = vminfo.tenantName
parameters[CommonTelemetryEventSchema.RoleName].value = vminfo.roleName
parameters[CommonTelemetryEventSchema.RoleInstanceName].value = vminfo.roleInstanceName
except Exception as e:
logger.warn("Failed to get VM info from goal state; will be missing from telemetry: {0}", ustr(e))
try:
imds_client = get_imds_client()
imds_info = imds_client.get_compute()
parameters[CommonTelemetryEventSchema.Location].value = imds_info.location
parameters[CommonTelemetryEventSchema.SubscriptionId].value = imds_info.subscriptionId
parameters[CommonTelemetryEventSchema.ResourceGroupName].value = imds_info.resourceGroupName
parameters[CommonTelemetryEventSchema.VMId].value = imds_info.vmId
parameters[CommonTelemetryEventSchema.ImageOrigin].value = int(imds_info.image_origin)
except Exception as e:
logger.warn("Failed to get IMDS info; will be missing from telemetry: {0}", ustr(e))
def save_event(self, data):
if self.event_dir is None:
logger.warn("Cannot save event -- Event reporter is not initialized.")
return
try:
fileutil.mkdir(self.event_dir, mode=0o700)
except (IOError, OSError) as e:
msg = "Failed to create events folder {0}. Error: {1}".format(self.event_dir, ustr(e))
raise EventError(msg)
try:
existing_events = os.listdir(self.event_dir)
if len(existing_events) >= MAX_NUMBER_OF_EVENTS:
logger.periodic_warn(logger.EVERY_MINUTE, "[PERIODIC] Too many files under: {0}, current count: {1}, "
"removing oldest event files".format(self.event_dir,
len(existing_events)))
existing_events.sort()
oldest_files = existing_events[:-999]
for event_file in oldest_files:
os.remove(os.path.join(self.event_dir, event_file))
except (IOError, OSError) as e:
msg = "Failed to remove old events from events folder {0}. Error: {1}".format(self.event_dir, ustr(e))
raise EventError(msg)
filename = os.path.join(self.event_dir,
ustr(int(time.time() * 1000000)))
try:
with open(filename + ".tmp", 'wb+') as hfile:
hfile.write(data.encode("utf-8"))
os.rename(filename + ".tmp", filename + AGENT_EVENT_FILE_EXTENSION)
except (IOError, OSError) as e:
msg = "Failed to write events to file: {0}".format(e)
raise EventError(msg)
def reset_periodic(self):
self.periodic_events = {}
def is_period_elapsed(self, delta, h):
return h not in self.periodic_events or \
(self.periodic_events[h] + delta) <= datetime.now()
def add_periodic(self, delta, name, op=WALAEventOperation.Unknown, is_success=True, duration=0,
version=str(CURRENT_VERSION), message="", log_event=True, force=False):
h = hash(name + op + ustr(is_success) + message)
if force or self.is_period_elapsed(delta, h):
self.add_event(name, op=op, is_success=is_success, duration=duration,
version=version, message=message, log_event=log_event)
self.periodic_events[h] = datetime.now()
def add_event(self, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
message="", log_event=True, flush=False):
"""
:param flush: Flush the event immediately to the wire server
"""
if (not is_success) and log_event:
_log_event(name, op, message, duration, is_success=is_success)
event = TelemetryEvent(TELEMETRY_EVENT_EVENT_ID, TELEMETRY_EVENT_PROVIDER_ID)
event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Name, str_to_encoded_ustr(name)))
event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Version, str_to_encoded_ustr(version)))
event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Operation, str_to_encoded_ustr(op)))
event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.OperationSuccess, bool(is_success)))
event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Message, str_to_encoded_ustr(message)))
event.parameters.append(TelemetryEventParam(GuestAgentExtensionEventsSchema.Duration, int(duration)))
self.add_common_event_parameters(event, datetime.utcnow())
self.report_or_save_event(event, flush)
def add_log_event(self, level, message):
event = TelemetryEvent(TELEMETRY_LOG_EVENT_ID, TELEMETRY_LOG_PROVIDER_ID)
event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.EventName, WALAEventOperation.Log))
event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.CapabilityUsed, logger.LogLevel.STRINGS[level]))
event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.Context1, str_to_encoded_ustr(self._clean_up_message(message))))
event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.Context2, datetime.utcnow().strftime(logger.Logger.LogTimeFormatInUTC)))
event.parameters.append(TelemetryEventParam(GuestAgentGenericLogsSchema.Context3, ''))
self.add_common_event_parameters(event, datetime.utcnow())
self.report_or_save_event(event)
def add_metric(self, category, counter, instance, value, log_event=False):
"""
Create and save an event which contains a telemetry event.
:param str category: The category of metric (e.g. "cpu", "memory")
:param str counter: The specific metric within the category (e.g. "%idle")
:param str instance: For instanced metrics, the instance identifier (filesystem name, cpu core#, etc.)
:param value: Value of the metric
:param bool log_event: If true, log the collected metric in the agent log
"""
if log_event:
message = "Metric {0}/{1} [{2}] = {3}".format(category, counter, instance, value)
_log_event(AGENT_NAME, "METRIC", message, 0)
event = TelemetryEvent(TELEMETRY_METRICS_EVENT_ID, TELEMETRY_EVENT_PROVIDER_ID)
event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Category, str_to_encoded_ustr(category)))
event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Counter, str_to_encoded_ustr(counter)))
event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Instance, str_to_encoded_ustr(instance)))
event.parameters.append(TelemetryEventParam(GuestAgentPerfCounterEventsSchema.Value, float(value)))
self.add_common_event_parameters(event, datetime.utcnow())
self.report_or_save_event(event)
def report_or_save_event(self, event, flush=False):
"""
Flush the event to wireserver if flush to set to true, else
save it disk if we fail to send or not required to flush immediately.
TODO: pickup as many events as possible and send them in one go.
"""
report_success = False
if flush and self.protocol is not None:
report_success = self.protocol.report_event([event], flush)
if not report_success:
try:
data = get_properties(event)
self.save_event(json.dumps(data))
except EventError as e:
logger.periodic_error(logger.EVERY_FIFTEEN_MINUTES, "[PERIODIC] {0}".format(ustr(e)))
@staticmethod
def _clean_up_message(message):
# By the time the message has gotten to this point it is formatted as
#
# Old Time format
# YYYY/MM/DD HH:mm:ss.fffffff LEVEL <text>.
# YYYY/MM/DD HH:mm:ss.fffffff <text>.
# YYYY/MM/DD HH:mm:ss LEVEL <text>.
# YYYY/MM/DD HH:mm:ss <text>.
#
# UTC ISO Time format added in #1716
# YYYY-MM-DDTHH:mm:ss.fffffffZ LEVEL <text>.
# YYYY-MM-DDTHH:mm:ss.fffffffZ <text>.
# YYYY-MM-DDTHH:mm:ssZ LEVEL <text>.
# YYYY-MM-DDTHH:mm:ssZ <text>.
#
# The timestamp and the level are redundant, and should be stripped. The logging library does not schematize
# this data, so I am forced to parse the message using a regex. The format is regular, so the burden is low,
# and usability on the telemetry side is high.
if not message:
return message
# Adding two regexs to simplify the handling of logs and to keep it maintainable. Most of the logs would have
# level includent in the log itself, but if it doesn't have, the second regex is a catch all case and will work
# for all the cases.
log_level_format_parser = re.compile(r"^.*(INFO|WARNING|ERROR|VERBOSE)\s*(.*)$")
log_format_parser = re.compile(r"^[0-9:/\-TZ\s.]*\s(.*)$")
# Parsing the log messages containing levels in it
extract_level_message = log_level_format_parser.search(message)
if extract_level_message:
return extract_level_message.group(2) # The message bit
else:
# Parsing the log messages without levels in it.
extract_message = log_format_parser.search(message)
if extract_message:
return extract_message.group(1) # The message bit
else:
return message
def add_common_event_parameters(self, event, event_timestamp):
"""
This method is called for all events and ensures all telemetry fields are added before the event is sent out.
Note that the event timestamp is saved in the OpcodeName field.
"""
common_params = [TelemetryEventParam(CommonTelemetryEventSchema.GAVersion, CURRENT_AGENT),
TelemetryEventParam(CommonTelemetryEventSchema.ContainerId, AgentGlobals.get_container_id()),
TelemetryEventParam(CommonTelemetryEventSchema.OpcodeName, event_timestamp.strftime(logger.Logger.LogTimeFormatInUTC)),
TelemetryEventParam(CommonTelemetryEventSchema.EventTid, threading.current_thread().ident),
TelemetryEventParam(CommonTelemetryEventSchema.EventPid, os.getpid()),
TelemetryEventParam(CommonTelemetryEventSchema.TaskName, threading.current_thread().name)]
if event.eventId == TELEMETRY_EVENT_EVENT_ID and event.providerId == TELEMETRY_EVENT_PROVIDER_ID:
# Currently only the GuestAgentExtensionEvents has these columns, the other tables dont have them so skipping
# this data in those tables.
common_params.extend([TelemetryEventParam(GuestAgentExtensionEventsSchema.ExtensionType, event.file_type),
TelemetryEventParam(GuestAgentExtensionEventsSchema.IsInternal, False)])
event.parameters.extend(common_params)
event.parameters.extend(self._common_parameters)
__event_logger__ = EventLogger()
def get_event_logger():
return __event_logger__
def elapsed_milliseconds(utc_start):
now = datetime.utcnow()
if now < utc_start:
return 0
d = now - utc_start
return int(((d.days * 24 * 60 * 60 + d.seconds) * 1000) + \
(d.microseconds / 1000.0))
def report_event(op, is_success=True, message='', log_event=True, flush=False):
"""
:param flush: if true, flush the event immediately to the wire server
"""
add_event(AGENT_NAME,
version=str(CURRENT_VERSION),
is_success=is_success,
message=message,
op=op,
log_event=log_event, flush=flush)
def report_periodic(delta, op, is_success=True, message=''):
add_periodic(delta, AGENT_NAME,
version=str(CURRENT_VERSION),
is_success=is_success,
message=message,
op=op)
def report_metric(category, counter, instance, value, log_event=False, reporter=__event_logger__):
"""
Send a telemetry event reporting a single instance of a performance counter.
:param str category: The category of the metric (cpu, memory, etc)
:param str counter: The name of the metric ("%idle", etc)
:param str instance: For instanced metrics, the identifier of the instance. E.g. a disk drive name, a cpu core#
:param value: The value of the metric
:param bool log_event: If True, log the metric in the agent log as well
:param EventLogger reporter: The EventLogger instance to which metric events should be sent
"""
if reporter.event_dir is None:
logger.warn("Cannot report metric event -- Event reporter is not initialized.")
message = "Metric {0}/{1} [{2}] = {3}".format(category, counter, instance, value)
_log_event(AGENT_NAME, "METRIC", message, 0)
return
try:
reporter.add_metric(category, counter, instance, float(value), log_event)
except ValueError:
logger.periodic_warn(logger.EVERY_HALF_HOUR, "[PERIODIC] Cannot cast the metric value. Details of the Metric - "
"{0}/{1} [{2}] = {3}".format(category, counter, instance, value))
def initialize_event_logger_vminfo_common_parameters_and_protocol(protocol, reporter=__event_logger__):
# Initialize protocal for event logger to directly send events to wireserver
reporter.protocol = protocol
reporter.initialize_vminfo_common_parameters(protocol)
def add_event(name=AGENT_NAME, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
message="", log_event=True, flush=False, reporter=__event_logger__):
"""
:param flush: if true, flush the event immediately to the wire server
"""
if reporter.event_dir is None:
logger.warn("Cannot add event -- Event reporter is not initialized.")
_log_event(name, op, message, duration, is_success=is_success)
return
if should_emit_event(name, version, op, is_success):
mark_event_status(name, version, op, is_success)
reporter.add_event(name, op=op, is_success=is_success, duration=duration, version=str(version),
message=message,
log_event=log_event, flush=flush)
def info(op, fmt, *args):
"""
Creates a telemetry event and logs the message as INFO.
"""
logger.info(fmt, *args)
add_event(op=op, message=fmt.format(*args), is_success=True)
def warn(op, fmt, *args):
"""
Creates a telemetry event and logs the message as WARNING.
"""
logger.warn(fmt, *args)
add_event(op=op, message="[WARNING] " + fmt.format(*args), is_success=False, log_event=False)
def error(op, fmt, *args):
"""
Creates a telemetry event and logs the message as ERROR.
"""
logger.error(fmt, *args)
add_event(op=op, message=fmt.format(*args), is_success=False, log_event=False)
def add_log_event(level, message, forced=False, reporter=__event_logger__):
"""
:param level: LoggerLevel of the log event
:param message: Message
:param forced: Force write the event even if send_logs_to_telemetry() is disabled
(NOTE: Remove this flag once send_logs_to_telemetry() is enabled for all events)
:param reporter: The EventLogger instance to which metric events should be sent
:return:
"""
if reporter.event_dir is None:
return
if not (forced or send_logs_to_telemetry()):
return
if level >= logger.LogLevel.WARNING:
reporter.add_log_event(level, message)
def add_periodic(delta, name, op=WALAEventOperation.Unknown, is_success=True, duration=0, version=str(CURRENT_VERSION),
message="", log_event=True, force=False, reporter=__event_logger__):
if reporter.event_dir is None:
logger.warn("Cannot add periodic event -- Event reporter is not initialized.")
_log_event(name, op, message, duration, is_success=is_success)
return
reporter.add_periodic(delta, name, op=op, is_success=is_success, duration=duration, version=str(version),
message=message, log_event=log_event, force=force)
def mark_event_status(name, version, op, status):
if op in __event_status_operations__:
__event_status__.mark_event_status(name, version, op, status)
def should_emit_event(name, version, op, status):
return \
op not in __event_status_operations__ or \
__event_status__ is None or \
not __event_status__.event_marked(name, version, op) or \
__event_status__.event_succeeded(name, version, op) != status
def init_event_logger(event_dir):
__event_logger__.event_dir = event_dir
def init_event_status(status_dir):
__event_status__.initialize(status_dir)
def dump_unhandled_err(name):
if hasattr(sys, 'last_type') and hasattr(sys, 'last_value') and \
hasattr(sys, 'last_traceback'):
last_type = getattr(sys, 'last_type')
last_value = getattr(sys, 'last_value')
last_traceback = getattr(sys, 'last_traceback')
trace = traceback.format_exception(last_type, last_value,
last_traceback)
message = "".join(trace)
add_event(name, is_success=False, message=message,
op=WALAEventOperation.UnhandledError)
def enable_unhandled_err_dump(name):
atexit.register(dump_unhandled_err, name)