Diagnostic/Utils/LadDiagnosticUtil.py (60 lines of code) (raw):
#!/usr/bin/env python
#
# Azure Linux extension
#
# Copyright (c) Microsoft Corporation
# All rights reserved.
# MIT License
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the ""Software""), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above
# copyright notice and this permission notice shall be included in all copies or substantial portions of the
# Software. THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
# LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
# CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
# Get elements from DiagnosticsMonitorConfiguration in LadCfg based on element name
def getDiagnosticsMonitorConfigurationElement(ladCfg, elementName):
if ladCfg and 'diagnosticMonitorConfiguration' in ladCfg:
if elementName in ladCfg['diagnosticMonitorConfiguration']:
return ladCfg['diagnosticMonitorConfiguration'][elementName]
return None
# Get fileCfg form FileLogs in LadCfg
def getFileCfgFromLadCfg(ladCfg):
fileLogs = getDiagnosticsMonitorConfigurationElement(ladCfg, 'fileLogs')
if fileLogs and 'fileLogConfiguration' in fileLogs:
return fileLogs['fileLogConfiguration']
return None
# Get resource Id from LadCfg
def getResourceIdFromLadCfg(ladCfg):
metricsConfiguration = getDiagnosticsMonitorConfigurationElement(ladCfg, 'metrics')
if metricsConfiguration and 'resourceId' in metricsConfiguration:
return metricsConfiguration['resourceId']
return None
# Get event volume from LadCfg
def getEventVolumeFromLadCfg(ladCfg):
return getDiagnosticsMonitorConfigurationElement(ladCfg, 'eventVolume')
# Get default sample rate from LadCfg
def getDefaultSampleRateFromLadCfg(ladCfg):
if ladCfg and 'sampleRateInSeconds' in ladCfg:
return ladCfg['sampleRateInSeconds']
return None
def getPerformanceCounterCfgFromLadCfg(ladCfg):
"""
Return the array of metric definitions
:param ladCfg:
:return: array of metric definitions
"""
performanceCounters = getDiagnosticsMonitorConfigurationElement(ladCfg, 'performanceCounters')
if performanceCounters and 'performanceCounterConfiguration' in performanceCounters:
return performanceCounters['performanceCounterConfiguration']
return None
def getAggregationPeriodsFromLadCfg(ladCfg):
"""
Return an array of aggregation periods as specified. If nothing appears in the config, default PT1H
:param ladCfg:
:return: array of ISO 8601 intervals
:rtype: List(str)
"""
results = []
metrics = getDiagnosticsMonitorConfigurationElement(ladCfg, 'metrics')
if metrics and 'metricAggregation' in metrics:
for item in metrics['metricAggregation']:
if 'scheduledTransferPeriod' in item:
# assert isinstance(item['scheduledTransferPeriod'], str)
results.append(item['scheduledTransferPeriod'])
return results
def getSinkList(feature_config):
"""
Returns the list of sink names to which all data should be forwarded, according to this config
:param feature_config: The JSON config for a feature (e.g. the struct for "performanceCounters" or "syslogEvents")
:return: the list of names; might be an empty list
:rtype: [str]
"""
if feature_config and 'sinks' in feature_config and feature_config['sinks']:
return [sink_name.strip() for sink_name in feature_config['sinks'].split(',')]
return []
def getFeatureWideSinksFromLadCfg(ladCfg, feature_name):
"""
Returns the list of sink names to which all data for the given feature should be forwarded
:param ladCfg: The ladCfg JSON config
:param str feature_name: Name of the feature. Expected to be "performanceCounters" or "syslogEvents"
:return: the list of names; might be an empty list
:rtype: [str]
"""
return getSinkList(getDiagnosticsMonitorConfigurationElement(ladCfg, feature_name))
class SinkConfiguration:
def __init__(self):
self._sinks = {}
def insert_from_config(self, json):
"""
Walk through the sinksConfig JSON object and add all sinks within it. Every accepted sink is guaranteed to
have a 'name' and 'type' element.
:param json: A hash holding the body of a sinksConfig object
:return: A string containing warning messages, or an empty string
"""
msgs = []
if json and 'sink' in json:
for sink in json['sink']:
if 'name' in sink and 'type' in sink:
self._sinks[sink['name']] = sink
else:
msgs.append('Ignoring invalid sink definition {0}'.format(sink))
return '\n'.join(msgs)
def get_sink_by_name(self, sink_name):
"""
Return the JSON object defining a particular sink.
:param sink_name: string name of sink
:return: JSON object or None
"""
if sink_name in self._sinks:
return self._sinks[sink_name]
return None
def get_all_sink_names(self):
"""
Return a list of all names of defined sinks.
:return: list of names
"""
return self._sinks.keys()
def get_sinks_by_type(self, sink_type):
"""
Return a list of all names of defined sinks.
:return: list of names
"""
return [self._sinks[name] for name in self._sinks if self._sinks[name]['type'] == sink_type]