AzureEnhancedMonitor/ext/aem.py (1,187 lines of code) (raw):

# # Copyright 2014 Microsoft Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import re import socket import traceback import time import datetime import psutil import urlparse import xml.dom.minidom as minidom from azure.storage import TableService, Entity from Utils.WAAgentUtil import waagent, AddExtensionEvent FAILED_TO_RETRIEVE_MDS_DATA="(03100)Failed to retrieve mds data" FAILED_TO_RETRIEVE_LOCAL_DATA="(03101)Failed to retrieve local data" FAILED_TO_RETRIEVE_STORAGE_DATA="(03102)Failed to retrieve storage data" FAILED_TO_SERIALIZE_PERF_COUNTERS="(03103)Failed to serialize perf counters" def timedelta_total_seconds(delta): if not hasattr(datetime.timedelta, 'total_seconds'): return delta.days * 86400 + delta.seconds else: return delta.total_seconds() def get_host_base_from_uri(blob_uri): uri = urlparse.urlparse(blob_uri) netloc = uri.netloc if netloc is None: return None return netloc[netloc.find('.'):] MonitoringIntervalInMinute = 1 #One minute MonitoringInterval = 60 * MonitoringIntervalInMinute #It takes sometime before the performance date reaches azure table. AzureTableDelayInMinute = 5 #Five minute AzureTableDelay = 60 * AzureTableDelayInMinute AzureEnhancedMonitorVersion = "2.0.0" LibDir = "/var/lib/AzureEnhancedMonitor" LatestErrorRecord = "LatestErrorRecord" def clearLastErrorRecord(): errFile = os.path.join(LibDir, LatestErrorRecord) if os.path.exists(errFile) and os.path.isfile(errFile): os.remove(errFile) def getLatestErrorRecord(): errFile=os.path.join(LibDir, LatestErrorRecord) if os.path.exists(errFile) and os.path.isfile(errFile): with open(errFile, 'r') as f: return f.read() return "0" def updateLatestErrorRecord(s): errFile = os.path.join(LibDir, LatestErrorRecord) maxRetry = 3 for i in range(0, maxRetry): try: with open(errFile, "w+") as F: F.write(s.encode("utf8")) return except IOError: time.sleep(1) waagent.Error(("Failed to serialize latest error record to file:" "{0}").format(errFile)) AddExtensionEvent(message="failed to write latest error record") raise def easyHash(s): """ MDSD used the following hash algorithm to cal a first part of partition key """ strHash = 0 multiplier = 37 for c in s: strHash = strHash * multiplier + ord(c) #Only keep the last 64bit, since the mod base is 100 strHash = strHash % (1<<64) return strHash % 100 #Assume eventVolume is Large Epoch = datetime.datetime(1, 1, 1) tickInOneSecond = 1000 * 10000 # 1s = 1000 * 10000 ticks def getMDSTimestamp(unixTimestamp): unixTime = datetime.datetime.utcfromtimestamp(unixTimestamp) startTimestamp = int(timedelta_total_seconds(unixTime - Epoch)) return startTimestamp * tickInOneSecond def getIdentity(): identity = socket.gethostname() return identity def getMDSPartitionKey(identity, timestamp): hashVal = easyHash(identity) return "{0:0>19d}___{1:0>19d}".format(hashVal, timestamp) def getAzureDiagnosticKeyRange(): #Round down by MonitoringInterval endTime = (int(time.time()) / MonitoringInterval) * MonitoringInterval endTime = endTime - AzureTableDelay startTime = endTime - MonitoringInterval identity = getIdentity() startKey = getMDSPartitionKey(identity, getMDSTimestamp(startTime)) endKey = getMDSPartitionKey(identity, getMDSTimestamp(endTime)) return startKey, endKey def getAzureDiagnosticCPUData(accountName, accountKey, hostBase, startKey, endKey, deploymentId): try: waagent.Log("Retrieve diagnostic data(CPU).") table = "LinuxCpuVer2v0" tableService = TableService(account_name = accountName, account_key = accountKey, host_base = hostBase) ofilter = ("PartitionKey ge '{0}' and PartitionKey lt '{1}' " "and DeploymentId eq '{2}'").format(startKey, endKey, deploymentId) oselect = ("PercentProcessorTime,DeploymentId") data = tableService.query_entities(table, ofilter, oselect, 1) if data is None or len(data) == 0: return None cpuPercent = float(data[0].PercentProcessorTime) return cpuPercent except Exception as e: waagent.Error((u"Failed to retrieve diagnostic data(CPU): {0} {1}" "").format(e, traceback.format_exc())) updateLatestErrorRecord(FAILED_TO_RETRIEVE_MDS_DATA) AddExtensionEvent(message=FAILED_TO_RETRIEVE_MDS_DATA) return None def getAzureDiagnosticMemoryData(accountName, accountKey, hostBase, startKey, endKey, deploymentId): try: waagent.Log("Retrieve diagnostic data: Memory") table = "LinuxMemoryVer2v0" tableService = TableService(account_name = accountName, account_key = accountKey, host_base = hostBase) ofilter = ("PartitionKey ge '{0}' and PartitionKey lt '{1}' " "and DeploymentId eq '{2}'").format(startKey, endKey, deploymentId) oselect = ("PercentAvailableMemory,DeploymentId") data = tableService.query_entities(table, ofilter, oselect, 1) if data is None or len(data) == 0: return None memoryPercent = 100 - float(data[0].PercentAvailableMemory) return memoryPercent except Exception as e: waagent.Error((u"Failed to retrieve diagnostic data(Memory): {0} {1}" "").format(e, traceback.format_exc())) updateLatestErrorRecord(FAILED_TO_RETRIEVE_MDS_DATA) AddExtensionEvent(message=FAILED_TO_RETRIEVE_MDS_DATA) return None class AzureDiagnosticData(object): def __init__(self, config): self.config = config accountName = config.getLADName() accountKey = config.getLADKey() hostBase = config.getLADHostBase() hostname = socket.gethostname() deploymentId = config.getVmDeploymentId() startKey, endKey = getAzureDiagnosticKeyRange() self.cpuPercent = getAzureDiagnosticCPUData(accountName, accountKey, hostBase, startKey, endKey, deploymentId) self.memoryPercent = getAzureDiagnosticMemoryData(accountName, accountKey, hostBase, startKey, endKey, deploymentId) def getCPUPercent(self): return self.cpuPercent def getMemoryPercent(self): return self.memoryPercent class AzureDiagnosticMetric(object): def __init__(self, config): self.config = config self.linux = LinuxMetric(self.config) self.azure = AzureDiagnosticData(self.config) self.timestamp = int(time.time()) - AzureTableDelay def getTimestamp(self): return self.timestamp def getCurrHwFrequency(self): return self.linux.getCurrHwFrequency() def getMaxHwFrequency(self): return self.linux.getMaxHwFrequency() def getCurrVMProcessingPower(self): return self.linux.getCurrVMProcessingPower() def getGuaranteedVMProcessingPower(self): return self.linux.getGuaranteedVMProcessingPower() def getMaxVMProcessingPower(self): return self.linux.getMaxVMProcessingPower() def getNumOfCoresPerCPU(self): return self.linux.getNumOfCoresPerCPU() def getNumOfThreadsPerCore(self): return self.linux.getNumOfThreadsPerCore() def getPhysProcessingPowerPerVCPU(self): return self.linux.getPhysProcessingPowerPerVCPU() def getProcessorType(self): return self.linux.getProcessorType() def getReferenceComputeUnit(self): return self.linux.getReferenceComputeUnit() def getVCPUMapping(self): return self.linux.getVCPUMapping() def getVMProcessingPowerConsumption(self): return self.azure.getCPUPercent() def getCurrMemAssigned(self): return self.linux.getCurrMemAssigned() def getGuaranteedMemAssigned(self): return self.linux.getGuaranteedMemAssigned() def getMaxMemAssigned(self): return self.linux.getMaxMemAssigned() def getVMMemConsumption(self): return self.azure.getMemoryPercent() def getNetworkAdapterIds(self): return self.linux.getNetworkAdapterIds() def getNetworkAdapterMapping(self, adapterId): return self.linux.getNetworkAdapterMapping(adapterId) def getMaxNetworkBandwidth(self, adapterId): return self.linux.getMaxNetworkBandwidth(adapterId) def getMinNetworkBandwidth(self, adapterId): return self.linux.getMinNetworkBandwidth(adapterId) def getNetworkReadBytes(self, adapterId): return self.linux.getNetworkReadBytes(adapterId) def getNetworkWriteBytes(self, adapterId): return self.linux.getNetworkWriteBytes(adapterId) def getNetworkPacketRetransmitted(self): return self.linux.getNetworkPacketRetransmitted() def getLastHardwareChange(self): return self.linux.getLastHardwareChange() class CPUInfo(object): @staticmethod def getCPUInfo(): cpuinfo = waagent.GetFileContents("/proc/cpuinfo") ret, lscpu = waagent.RunGetOutput("lscpu") return CPUInfo(cpuinfo, lscpu) def __init__(self, cpuinfo, lscpu): self.cpuinfo = cpuinfo self.lscpu = lscpu self.cores = 1; self.coresPerCpu = 1; self.threadsPerCore = 1; coresMatch = re.search("CPU(s):\s+(\d+)", self.lscpu) if coresMatch: self.cores = int(coresMatch.group(1)) coresPerCpuMatch = re.search("Core(s) per socket:\s+(\d+)", self.lscpu) if coresPerCpuMatch: self.coresPerCpu = int(coresPerCpuMatch.group(1)) threadsPerCoreMatch = re.search("Core(s) per socket:\s+(\d+)", self.lscpu) if threadsPerCoreMatch: self.threadsPerCore = int(threadsPerCoreMatch.group(1)) model = re.search("model name\s+:\s+(.*)\s", self.cpuinfo) vendorId = re.search("vendor_id\s+:\s+(.*)\s", self.cpuinfo) if model and vendorId: self.processorType = "{0}, {1}".format(model.group(1), vendorId.group(1)) else: self.processorType = None freqMatch = re.search("CPU MHz:\s+(.*)\s", self.lscpu) if freqMatch: self.frequency = float(freqMatch.group(1)) else: self.frequency = None ht = re.match("flags\s.*\sht\s", self.cpuinfo) self.isHTon = ht is not None def getNumOfCoresPerCPU(self): return self.coresPerCpu def getNumOfCores(self): return self.cores def getNumOfThreadsPerCore(self): return self.threadsPerCore def getProcessorType(self): return self.processorType def getFrequency(self): return self.frequency def isHyperThreadingOn(self): return self.isHTon def getCPUPercent(self): return psutil.cpu_percent() class MemoryInfo(object): def __init__(self): self.memInfo = psutil.virtual_memory() def getMemSize(self): return self.memInfo[0] / 1024 / 1024 #MB def getMemPercent(self): return self.memInfo[2] #% def getMacAddress(adapterId): nicAddrPath = os.path.join("/sys/class/net", adapterId, "address") mac = waagent.GetFileContents(nicAddrPath) mac = mac.strip() mac = mac.replace(":", "-") return mac def sameList(l1, l2): if l1 is None or l2 is None: return l1 == l2 if len(l1) != len(l2): return False for i in range(0, len(l1)): if l1[i] != l2[i]: return False return True class NetworkInfo(object): def __init__(self): self.nics = psutil.net_io_counters(pernic=True) self.nicNames = [] for nicName, stat in self.nics.iteritems(): if nicName != 'lo': self.nicNames.append(nicName) def getAdapterIds(self): return self.nicNames def getNetworkReadBytes(self, adapterId): net = psutil.net_io_counters(pernic=True) if net[adapterId] != None: bytes_recv1 = net[adapterId][1] time1 = time.time() time.sleep(0.2) net = psutil.net_io_counters(pernic=True) bytes_recv2 = net[adapterId][1] time2 = time.time() interval = (time2 - time1) return (bytes_recv2 - bytes_recv1) / interval else: return 0 def getNetworkWriteBytes(self, adapterId): net = psutil.net_io_counters(pernic=True) if net[adapterId] != None: bytes_sent1 = net[adapterId][0] time1 = time.time() time.sleep(0.2) net = psutil.net_io_counters(pernic=True) bytes_sent2 = net[adapterId][0] time2 = time.time() interval = (time2 - time1) return (bytes_sent2 - bytes_sent1) / interval else: return 0 def getNetstat(self): retCode, output = waagent.RunGetOutput("netstat -s", chk_err=False) return output def getNetworkPacketRetransmitted(self): netstat = self.getNetstat() match = re.search("(\d+)\s*segments retransmited", netstat) if match != None: return int(match.group(1)) else: waagent.Error("Failed to parse netstat output: {0}".format(netstat)) updateLatestErrorRecord(FAILED_TO_RETRIEVE_LOCAL_DATA) AddExtensionEvent(message=FAILED_TO_RETRIEVE_LOCAL_DATA) return None HwInfoFile = os.path.join(LibDir, "HwInfo") class HardwareChangeInfo(object): def __init__(self, networkInfo): self.networkInfo = networkInfo def getHwInfo(self): if not os.path.isfile(HwInfoFile): return None, None hwInfo = waagent.GetFileContents(HwInfoFile).split("\n") return int(hwInfo[0]), hwInfo[1:] def setHwInfo(self, timestamp, hwInfo): content = str(timestamp) content = content + "\n" + "\n".join(hwInfo) waagent.SetFileContents(HwInfoFile, content) def getLastHardwareChange(self): oldTime, oldMacs = self.getHwInfo() newMacs = map(lambda x : getMacAddress(x), self.networkInfo.getAdapterIds()) newTime = int(time.time()) newMacs.sort() if oldMacs is None or not sameList(newMacs, oldMacs): #Hardware changed if newTime < oldTime: waagent.Warn(("Hardware change detected. But the old timestamp " "is greater than now, {0}>{1}.").format(oldTime, newTime)) self.setHwInfo(newTime, newMacs) return newTime else: return oldTime class LinuxMetric(object): def __init__(self, config): self.config = config #CPU self.cpuInfo = CPUInfo.getCPUInfo() #Memory self.memInfo = MemoryInfo() #Network self.networkInfo = NetworkInfo() #Detect hardware change self.hwChangeInfo = HardwareChangeInfo(self.networkInfo) self.timestamp = int(time.time()) def getTimestamp(self): return self.timestamp def getCurrHwFrequency(self): return self.cpuInfo.getFrequency() def getMaxHwFrequency(self): return self.getCurrHwFrequency() def getCurrVMProcessingPower(self): if self.config.isCpuOverCommitted(): return None else: return self.cpuInfo.getNumOfCores() def getGuaranteedVMProcessingPower(self): return self.getCurrVMProcessingPower() def getMaxVMProcessingPower(self): return self.getCurrVMProcessingPower() def getNumOfCoresPerCPU(self): return self.cpuInfo.getNumOfCoresPerCPU() def getNumOfThreadsPerCore(self): return self.cpuInfo.getNumOfThreadsPerCore() def getPhysProcessingPowerPerVCPU(self): return 1 / float(self.getNumOfThreadsPerCore()) def getProcessorType(self): return self.cpuInfo.getProcessorType() def getReferenceComputeUnit(self): return self.getProcessorType() def getVCPUMapping(self): return "thread" if self.cpuInfo.isHyperThreadingOn() else "core" def getVMProcessingPowerConsumption(self): return self.memInfo.getMemPercent() def getCurrMemAssigned(self): if self.config.isMemoryOverCommitted(): return None else: return self.memInfo.getMemSize() def getGuaranteedMemAssigned(self): return self.getCurrMemAssigned() def getMaxMemAssigned(self): return self.getCurrMemAssigned() def getVMMemConsumption(self): return self.memInfo.getMemPercent() def getNetworkAdapterIds(self): return self.networkInfo.getAdapterIds() def getNetworkAdapterMapping(self, adapterId): return getMacAddress(adapterId) def getMaxNetworkBandwidth(self, adapterId): return 1000 #Mbit/s def getMinNetworkBandwidth(self, adapterId): return 1000 #Mbit/s def getNetworkReadBytes(self, adapterId): return self.networkInfo.getNetworkReadBytes(adapterId) def getNetworkWriteBytes(self, adapterId): return self.networkInfo.getNetworkWriteBytes(adapterId) def getNetworkPacketRetransmitted(self): return self.networkInfo.getNetworkPacketRetransmitted() def getLastHardwareChange(self): return self.hwChangeInfo.getLastHardwareChange() class VMDataSource(object): def __init__(self, config): self.config = config def collect(self): counters = [] if self.config.isLADEnabled(): metrics = AzureDiagnosticMetric(self.config) else: metrics = LinuxMetric(self.config) #CPU counters.append(self.createCounterCurrHwFrequency(metrics)) counters.append(self.createCounterMaxHwFrequency(metrics)) counters.append(self.createCounterCurrVMProcessingPower(metrics)) counters.append(self.createCounterGuaranteedVMProcessingPower(metrics)) counters.append(self.createCounterMaxVMProcessingPower(metrics)) counters.append(self.createCounterNumOfCoresPerCPU(metrics)) counters.append(self.createCounterNumOfThreadsPerCore(metrics)) counters.append(self.createCounterPhysProcessingPowerPerVCPU(metrics)) counters.append(self.createCounterProcessorType(metrics)) counters.append(self.createCounterReferenceComputeUnit(metrics)) counters.append(self.createCounterVCPUMapping(metrics)) counters.append(self.createCounterVMProcessingPowerConsumption(metrics)) #Memory counters.append(self.createCounterCurrMemAssigned(metrics)) counters.append(self.createCounterGuaranteedMemAssigned(metrics)) counters.append(self.createCounterMaxMemAssigned(metrics)) counters.append(self.createCounterVMMemConsumption(metrics)) #Network adapterIds = metrics.getNetworkAdapterIds() for adapterId in adapterIds: if adapterId.startswith('eth'): counters.append(self.createCounterAdapterId(adapterId)) counters.append(self.createCounterNetworkMapping(metrics, adapterId)) counters.append(self.createCounterMinNetworkBandwidth(metrics, adapterId)) counters.append(self.createCounterMaxNetworkBandwidth(metrics, adapterId)) counters.append(self.createCounterNetworkReadBytes(metrics, adapterId)) counters.append(self.createCounterNetworkWriteBytes(metrics, adapterId)) counters.append(self.createCounterNetworkPacketRetransmitted(metrics)) #Hardware change counters.append(self.createCounterLastHardwareChange(metrics)) #Error counters.append(self.createCounterError()) return counters def createCounterLastHardwareChange(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_LARGE, category = "config", name = "Last Hardware Change", value = metrics.getLastHardwareChange(), unit="posixtime") def createCounterError(self): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_LARGE, category = "config", name = "Error", value = getLatestErrorRecord()) def createCounterCurrHwFrequency(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_DOUBLE, category = "cpu", name = "Current Hw Frequency", value = metrics.getCurrHwFrequency(), unit = "MHz", refreshInterval = 60) def createCounterMaxHwFrequency(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_DOUBLE, category = "cpu", name = "Max Hw Frequency", value = metrics.getMaxHwFrequency(), unit = "MHz") def createCounterCurrVMProcessingPower(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "cpu", name = "Current VM Processing Power", value = metrics.getCurrVMProcessingPower(), unit = "compute unit") def createCounterMaxVMProcessingPower(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "cpu", name = "Max. VM Processing Power", value = metrics.getMaxVMProcessingPower(), unit = "compute unit") def createCounterGuaranteedVMProcessingPower(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "cpu", name = "Guaranteed VM Processing Power", value = metrics.getGuaranteedVMProcessingPower(), unit = "compute unit") def createCounterNumOfCoresPerCPU(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "cpu", name = "Number of Cores per CPU", value = metrics.getNumOfCoresPerCPU()) def createCounterNumOfThreadsPerCore(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "cpu", name = "Number of Threads per Core", value = metrics.getNumOfThreadsPerCore()) def createCounterPhysProcessingPowerPerVCPU(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_DOUBLE, category = "cpu", name = "Phys. Processing Power per vCPU", value = metrics.getPhysProcessingPowerPerVCPU()) def createCounterProcessorType(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "cpu", name = "Processor Type", value = metrics.getProcessorType()) def createCounterReferenceComputeUnit(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "cpu", name = "Reference Compute Unit", value = metrics.getReferenceComputeUnit()) def createCounterVCPUMapping(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "cpu", name = "vCPU Mapping", value = metrics.getVCPUMapping()) def createCounterVMProcessingPowerConsumption(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_DOUBLE, category = "cpu", name = "VM Processing Power Consumption", value = metrics.getVMProcessingPowerConsumption(), unit = "%", timestamp = metrics.getTimestamp(), refreshInterval = 60) def createCounterCurrMemAssigned(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "memory", name = "Current Memory assigned", value = metrics.getCurrMemAssigned(), unit = "MB") def createCounterMaxMemAssigned(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "memory", name = "Max Memory assigned", value = metrics.getMaxMemAssigned(), unit = "MB") def createCounterGuaranteedMemAssigned(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "memory", name = "Guaranteed Memory assigned", value = metrics.getGuaranteedMemAssigned(), unit = "MB") def createCounterVMMemConsumption(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_DOUBLE, category = "memory", name = "VM Memory Consumption", value = metrics.getVMMemConsumption(), unit = "%", timestamp = metrics.getTimestamp(), refreshInterval = 60) def createCounterAdapterId(self, adapterId): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "network", name = "Adapter Id", instance = adapterId, value = adapterId) def createCounterNetworkMapping(self, metrics, adapterId): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "network", name = "Mapping", instance = adapterId, value = metrics.getNetworkAdapterMapping(adapterId)) def createCounterMaxNetworkBandwidth(self, metrics, adapterId): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "network", name = "VM Maximum Network Bandwidth", instance = adapterId, value = metrics.getMaxNetworkBandwidth(adapterId), unit = "Mbit/s") def createCounterMinNetworkBandwidth(self, metrics, adapterId): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "network", name = "VM Minimum Network Bandwidth", instance = adapterId, value = metrics.getMinNetworkBandwidth(adapterId), unit = "Mbit/s") def createCounterNetworkReadBytes(self, metrics, adapterId): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_LARGE, category = "network", name = "Network Read Bytes", instance = adapterId, value = metrics.getNetworkReadBytes(adapterId), unit = "byte/s") def createCounterNetworkWriteBytes(self, metrics, adapterId): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_LARGE, category = "network", name = "Network Write Bytes", instance = adapterId, value = metrics.getNetworkWriteBytes(adapterId), unit = "byte/s") def createCounterNetworkPacketRetransmitted(self, metrics): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "network", name = "Packets Retransmitted", value = metrics.getNetworkPacketRetransmitted(), unit = "packets/min") def getStorageTimestamp(unixTimestamp): tformat = "{0:0>4d}{1:0>2d}{2:0>2d}T{3:0>2d}{4:0>2d}" ts = time.gmtime(unixTimestamp) return tformat.format(ts.tm_year, ts.tm_mon, ts.tm_mday, ts.tm_hour, ts.tm_min) def getStorageTableKeyRange(): #Round down by MonitoringInterval endTime = int(time.time()) / MonitoringInterval * MonitoringInterval endTime = endTime - AzureTableDelay startTime = endTime - MonitoringInterval return getStorageTimestamp(startTime), getStorageTimestamp(endTime) def getStorageMetrics(account, key, hostBase, table, startKey, endKey): try: waagent.Log("Retrieve storage metrics data.") tableService = TableService(account_name = account, account_key = key, host_base = hostBase) ofilter = ("PartitionKey ge '{0}' and PartitionKey lt '{1}'" "").format(startKey, endKey) oselect = ("TotalRequests,TotalIngress,TotalEgress,AverageE2ELatency," "AverageServerLatency,RowKey") metrics = tableService.query_entities(table, ofilter, oselect) waagent.Log("{0} records returned.".format(len(metrics))) return metrics except Exception as e: waagent.Error((u"Failed to retrieve storage metrics data: {0} {1}" "").format(e, traceback.format_exc())) updateLatestErrorRecord(FAILED_TO_RETRIEVE_STORAGE_DATA) AddExtensionEvent(message=FAILED_TO_RETRIEVE_STORAGE_DATA) return None def getDataDisks(): blockDevs = os.listdir('/sys/block') dataDisks = filter(lambda d : re.match("sd[c-z]", d), blockDevs) return dataDisks def getFirstLun(dev): path = os.path.join("/sys/block", dev, "device/scsi_disk") for lun in os.listdir(path): return int(lun[-1]) class DiskInfo(object): def __init__(self, config): self.config = config def getDiskMapping(self): osdiskVhd = "{0} {1}".format(self.config.getOSDiskAccount(), self.config.getOSDiskName()) osdisk = { "vhd":osdiskVhd, "type": self.config.getOSDiskType(), "caching": self.config.getOSDiskCaching(), "iops": self.config.getOSDiskSLAIOPS(), "throughput": self.config.getOSDiskSLAThroughput(), } diskMapping = { "/dev/sda": osdisk, } dataDisks = getDataDisks() if dataDisks is None or len(dataDisks) == 0: return diskMapping lunToDevMap = {} for dev in dataDisks: lun = getFirstLun(dev) lunToDevMap[lun] = dev diskCount = self.config.getDataDiskCount() for i in range(0, diskCount): lun = self.config.getDataDiskLun(i) datadiskVhd = "{0} {1}".format(self.config.getDataDiskAccount(i), self.config.getDataDiskName(i)) datadisk = { "vhd": datadiskVhd, "type": self.config.getDataDiskType(i), "caching": self.config.getDataDiskCaching(i), "iops": self.config.getDataDiskSLAIOPS(i), "throughput": self.config.getDataDiskSLAThroughput(i), } if lun in lunToDevMap: dev = lunToDevMap[lun] diskMapping[dev] = datadisk else: waagent.Warn("Couldn't find disk with lun: {0}".format(lun)) return diskMapping def isUserRead(op): if not op.startswith("user;"): return False op = op[5:] for prefix in ["Get", "List", "Preflight"]: if op.startswith(prefix): return True return False def isUserWrite(op): if not op.startswith("user;"): return False op = op[5:] for prefix in ["Put" ,"Set" ,"Clear" ,"Delete" ,"Create" ,"Snapshot"]: if op.startswith(prefix): return True return False def storageStat(metrics, opFilter): stat = {} stat['bytes'] = None stat['ops'] = None stat['e2eLatency'] = None stat['serverLatency'] = None stat['throughput'] = None if metrics is None: return stat metrics = filter(lambda x : opFilter(x.RowKey), metrics) stat['bytes'] = sum(map(lambda x : x.TotalIngress + x.TotalEgress, metrics)) stat['ops'] = sum(map(lambda x : x.TotalRequests, metrics)) if stat['ops'] != 0: stat['e2eLatency'] = sum(map(lambda x : x.TotalRequests * \ x.AverageE2ELatency, metrics)) / stat['ops'] stat['serverLatency'] = sum(map(lambda x : x.TotalRequests * \ x.AverageServerLatency, metrics)) / stat['ops'] #Convert to MB/s stat['throughput'] = float(stat['bytes']) / (1024 * 1024) / 60 return stat class AzureStorageStat(object): def __init__(self, metrics): self.metrics = metrics self.rStat = storageStat(metrics, isUserRead) self.wStat = storageStat(metrics, isUserWrite) def getReadBytes(self): return self.rStat['bytes'] def getReadOps(self): return self.rStat['ops'] def getReadOpE2ELatency(self): return self.rStat['e2eLatency'] def getReadOpServerLatency(self): return self.rStat['serverLatency'] def getReadOpThroughput(self): return self.rStat['throughput'] def getWriteBytes(self): return self.wStat['bytes'] def getWriteOps(self): return self.wStat['ops'] def getWriteOpE2ELatency(self): return self.wStat['e2eLatency'] def getWriteOpServerLatency(self): return self.wStat['serverLatency'] def getWriteOpThroughput(self): return self.wStat['throughput'] class StorageDataSource(object): def __init__(self, config): self.config = config def collect(self): counters = [] #Add disk mapping for resource disk counters.append(self.createCounterDiskMapping("/dev/sdb", "not mapped to vhd")) #Add disk mapping for osdisk and data disk diskMapping = DiskInfo(self.config).getDiskMapping() for dev, disk in diskMapping.iteritems(): counters.append(self.createCounterDiskMapping(dev, disk.get("vhd"))) counters.append(self.createCounterDiskType(dev, disk.get("type"))) counters.append(self.createCounterDiskCaching(dev, disk.get("caching"))) if disk.get("type") == "Premium": counters.append(self.createCounterDiskIOPS(dev, disk.get("iops"))) counters.append(self.createCounterDiskThroughput(dev, disk.get("throughput"))) accounts = self.config.getStorageAccountNames() for account in accounts: if self.config.getStorageAccountType(account) == "Standard": counters.extend(self.collectMetrixForStandardStorage(account)) return counters def collectMetrixForStandardStorage(self, account): counters = [] startKey, endKey = getStorageTableKeyRange() tableName = self.config.getStorageAccountMinuteTable(account) accountKey = self.config.getStorageAccountKey(account) hostBase = self.config.getStorageHostBase(account) metrics = getStorageMetrics(account, accountKey, hostBase, tableName, startKey, endKey) stat = AzureStorageStat(metrics) counters.append(self.createCounterStorageId(account)) counters.append(self.createCounterReadBytes(account, stat)) counters.append(self.createCounterReadOps(account, stat)) counters.append(self.createCounterReadOpE2ELatency(account, stat)) counters.append(self.createCounterReadOpServerLatency(account, stat)) counters.append(self.createCounterReadOpThroughput(account, stat)) counters.append(self.createCounterWriteBytes(account, stat)) counters.append(self.createCounterWriteOps(account, stat)) counters.append(self.createCounterWriteOpE2ELatency(account, stat)) counters.append(self.createCounterWriteOpServerLatency(account, stat)) counters.append(self.createCounterWriteOpThroughput(account, stat)) return counters def createCounterDiskType(self, dev, diskType): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "disk", name = "Storage Type", instance = dev, value = diskType) def createCounterDiskCaching(self, dev, caching): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "disk", name = "Caching", instance = dev, value = caching) def createCounterDiskThroughput(self, dev, throughput): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "disk", name = "SLA Throughput", instance = dev, unit = "MB/sec", value = throughput) def createCounterDiskIOPS(self, dev, iops): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "disk", name = "SLA", instance = dev, unit = "Ops/sec", value = iops) def createCounterReadBytes(self, account, stat): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_LARGE, category = "storage", name = "Storage Read Bytes", instance = account, value = stat.getReadBytes(), unit = 'byte', refreshInterval = 60) def createCounterReadOps(self, account, stat): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "storage", name = "Storage Read Ops", instance = account, value = stat.getReadOps(), refreshInterval = 60) def createCounterReadOpE2ELatency(self, account, stat): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_DOUBLE, category = "storage", name = "Storage Read Op Latency E2E msec", instance = account, value = stat.getReadOpE2ELatency(), unit = 'ms', refreshInterval = 60) def createCounterReadOpServerLatency(self, account, stat): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_DOUBLE, category = "storage", name = "Storage Read Op Latency Server msec", instance = account, value = stat.getReadOpServerLatency(), unit = 'ms', refreshInterval = 60) def createCounterReadOpThroughput(self, account, stat): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_DOUBLE, category = "storage", name = "Storage Read Throughput E2E MB/sec", instance = account, value = stat.getReadOpThroughput(), unit = 'MB/s', refreshInterval = 60) def createCounterWriteBytes(self, account, stat): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_LARGE, category = "storage", name = "Storage Write Bytes", instance = account, value = stat.getWriteBytes(), unit = 'byte', refreshInterval = 60) def createCounterWriteOps(self, account, stat): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "storage", name = "Storage Write Ops", instance = account, value = stat.getWriteOps(), refreshInterval = 60) def createCounterWriteOpE2ELatency(self, account, stat): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_DOUBLE, category = "storage", name = "Storage Write Op Latency E2E msec", instance = account, value = stat.getWriteOpE2ELatency(), unit = 'ms', refreshInterval = 60) def createCounterWriteOpServerLatency(self, account, stat): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_DOUBLE, category = "storage", name = "Storage Write Op Latency Server msec", instance = account, value = stat.getWriteOpServerLatency(), unit = 'ms', refreshInterval = 60) def createCounterWriteOpThroughput(self, account, stat): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_DOUBLE, category = "storage", name = "Storage Write Throughput E2E MB/sec", instance = account, value = stat.getWriteOpThroughput(), unit = 'MB/s', refreshInterval = 60) def createCounterStorageId(self, account): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "storage", name = "Storage ID", instance = account, value = account) def createCounterDiskMapping(self, dev, vhd): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "storage", name = "Phys. Disc to Storage Mapping", instance = dev, value = vhd) class HvInfo(object): def __init__(self): self.hvName = None; self.hvVersion = None; root_dir = os.path.dirname(__file__) cmd = os.path.join(root_dir, "bin/hvinfo") ret, output = waagent.RunGetOutput(cmd, chk_err=False) print(ret) if ret ==0 and output is not None: lines = output.split("\n") if len(lines) >= 2: self.hvName = lines[0] self.hvVersion = lines[1] def getHvName(self): return self.hvName def getHvVersion(self): return self.hvVersion class StaticDataSource(object): def __init__(self, config): self.config = config def collect(self): counters = [] hvInfo = HvInfo() counters.append(self.createCounterCloudProvider()) counters.append(self.createCounterCpuOverCommitted()) counters.append(self.createCounterMemoryOverCommitted()) counters.append(self.createCounterDataProviderVersion()) counters.append(self.createCounterDataSources()) counters.append(self.createCounterInstanceType()) counters.append(self.createCounterVirtSln(hvInfo.getHvName())) counters.append(self.createCounterVirtSlnVersion(hvInfo.getHvVersion())) vmSLAThroughput = self.config.getVMSLAThroughput() if vmSLAThroughput is not None: counters.append(self.createCounterVMSLAThroughput(vmSLAThroughput)) vmSLAIOPS = self.config.getVMSLAIOPS() if vmSLAIOPS is not None: counters.append(self.createCounterVMSLAIOPS(vmSLAIOPS)) return counters def createCounterVMSLAThroughput(self, throughput): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "config", name = "SLA Max Disk Bandwidth per VM", unit = "Ops/sec", value = throughput) def createCounterVMSLAIOPS(self, iops): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_INT, category = "config", name = "SLA Max Disk IOPS per VM", unit = "Ops/sec", value = iops) def createCounterCloudProvider(self): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "config", name = "Cloud Provider", value = "Microsoft Azure") def createCounterVirtSlnVersion(self, hvVersion): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "config", name = "Virtualization Solution Version", value = hvVersion) def createCounterVirtSln(self, hvName): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "config", name = "Virtualization Solution", value = hvName) def createCounterInstanceType(self): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "config", name = "Instance Type", value = self.config.getVmSize()) def createCounterDataSources(self): dataSource = "wad" if self.config.isLADEnabled() else "local" return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "config", name = "Data Sources", value = dataSource) def createCounterDataProviderVersion(self): return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "config", name = "Data Provider Version", value = AzureEnhancedMonitorVersion) def createCounterMemoryOverCommitted(self): value = "yes" if self.config.isMemoryOverCommitted() else "no" return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "config", name = "Memory Over-Provisioning", value = value) def createCounterCpuOverCommitted(self): value = "yes" if self.config.isCpuOverCommitted() else "no" return PerfCounter(counterType = PerfCounterType.COUNTER_TYPE_STRING, category = "config", name = "CPU Over-Provisioning", value = value) class PerfCounterType(object): COUNTER_TYPE_INVALID = 0 COUNTER_TYPE_INT = 1 COUNTER_TYPE_DOUBLE = 2 COUNTER_TYPE_LARGE = 3 COUNTER_TYPE_STRING = 4 class PerfCounter(object): def __init__(self, counterType, category, name, value, instance="", unit="none", timestamp = None, refreshInterval=0): self.counterType = counterType self.category = category self.name = name self.instance = instance self.value = value self.unit = unit self.refreshInterval = refreshInterval if(timestamp): self.timestamp = timestamp else: self.timestamp = int(time.time()) self.machine = socket.gethostname() def __str__(self): return (u"{0};{1};{2};{3};{4};{5};{6};{7};{8};{9};\n" "").format(self.counterType, self.category, self.name, self.instance, 0 if self.value is not None else 1, self.value if self.value is not None else "", self.unit, self.refreshInterval, self.timestamp, self.machine) __repr__ = __str__ class EnhancedMonitor(object): def __init__(self, config): self.dataSources = [] self.dataSources.append(VMDataSource(config)) self.dataSources.append(StorageDataSource(config)) self.dataSources.append(StaticDataSource(config)) self.writer = PerfCounterWriter() def run(self): counters = [] for dataSource in self.dataSources: counters.extend(dataSource.collect()) clearLastErrorRecord() self.writer.write(counters) EventFile=os.path.join(LibDir, "PerfCounters") class PerfCounterWriter(object): def write(self, counters, maxRetry = 3, eventFile=EventFile): for i in range(0, maxRetry): try: self._write(counters, eventFile) waagent.Log(("Write {0} counters to event file." "").format(len(counters))) return except IOError as e: waagent.Warn((u"Write to perf counters file failed: {0}" "").format(e)) waagent.Log("Retry: {0}".format(i)) time.sleep(1) waagent.Error(("Failed to serialize perf counter to file:" "{0}").format(eventFile)) updateLatestErrorRecord(FAILED_TO_SERIALIZE_PERF_COUNTERS) AddExtensionEvent(message=FAILED_TO_SERIALIZE_PERF_COUNTERS) raise def _write(self, counters, eventFile): with open(eventFile, "w+") as F: F.write("".join(map(lambda c : str(c), counters)).encode("utf8")) class EnhancedMonitorConfig(object): def __init__(self, publicConfig, privateConfig): xmldoc = minidom.parse('/var/lib/waagent/SharedConfig.xml') self.deployment = xmldoc.getElementsByTagName('Deployment') self.role = xmldoc.getElementsByTagName('Role') self.configData = {} diskCount = 0 accountNames = [] for item in publicConfig["cfg"]: self.configData[item["key"]] = item["value"] if item["key"].startswith("disk.lun"): diskCount = diskCount + 1 if item["key"].endswith("minute.name"): accountNames.append(item["value"]) for item in privateConfig["cfg"]: self.configData[item["key"]] = item["value"] self.configData["disk.count"] = diskCount self.configData["account.names"] = accountNames def getVmSize(self): return self.configData.get("vmsize") def getVmRoleInstance(self): return self.role[0].attributes['name'].value def getVmDeploymentId(self): return self.deployment[0].attributes['name'].value def isMemoryOverCommitted(self): return self.configData.get("vm.memory.isovercommitted") def isCpuOverCommitted(self): return self.configData.get("vm.cpu.isovercommitted") def getScriptVersion(self): return self.configData.get("script.version") def isVerbose(self): flag = self.configData.get("verbose") return flag == "1" or flag == 1 def getVMSLAIOPS(self): return self.configData.get("vm.sla.iops") def getVMSLAThroughput(self): return self.configData.get("vm.sla.throughput") def getOSDiskName(self): return self.configData.get("osdisk.name") def getOSDiskAccount(self): osdiskConnMinute = self.getOSDiskConnMinute() return self.configData.get("{0}.name".format(osdiskConnMinute)) def getOSDiskConnMinute(self): return self.configData.get("osdisk.connminute") def getOSDiskConnHour(self): return self.configData.get("osdisk.connhour") def getOSDiskType(self): return self.configData.get("osdisk.type") def getOSDiskCaching(self): return self.configData.get("osdisk.caching") def getOSDiskSLAIOPS(self): return self.configData.get("osdisk.sla.iops") def getOSDiskSLAThroughput(self): return self.configData.get("osdisk.sla.throughput") def getDataDiskCount(self): return self.configData.get("disk.count") def getDataDiskLun(self, index): return self.configData.get("disk.lun.{0}".format(index)) def getDataDiskName(self, index): return self.configData.get("disk.name.{0}".format(index)) def getDataDiskAccount(self, index): return self.configData.get("disk.account.{0}".format(index)) def getDataDiskConnMinute(self, index): return self.configData.get("disk.connminute.{0}".format(index)) def getDataDiskConnHour(self, index): return self.configData.get("disk.connhour.{0}".format(index)) def getDataDiskType(self, index): return self.configData.get("disk.type.{0}".format(index)) def getDataDiskCaching(self, index): return self.configData.get("disk.caching.{0}".format(index)) def getDataDiskSLAIOPS(self, index): return self.configData.get("disk.sla.iops.{0}".format(index)) def getDataDiskSLAThroughput(self, index): return self.configData.get("disk.sla.throughput.{0}".format(index)) def getStorageAccountNames(self): return self.configData.get("account.names") def getStorageAccountKey(self, name): return self.configData.get("{0}.minute.key".format(name)) def getStorageAccountType(self, name): key = "{0}.minute.ispremium".format(name) return "Premium" if self.configData.get(key) == 1 else "Standard" def getStorageHostBase(self, name): return get_host_base_from_uri(self.getStorageAccountMinuteUri(name)) def getStorageAccountMinuteUri(self, name): return self.configData.get("{0}.minute.uri".format(name)) def getStorageAccountMinuteTable(self, name): uri = self.getStorageAccountMinuteUri(name) pos = uri.rfind('/') tableName = uri[pos+1:] return tableName def getStorageAccountHourUri(self, name): return self.configData.get("{0}.hour.uri".format(name)) def isLADEnabled(self): flag = self.configData.get("wad.isenabled") return flag == "1" or flag == 1 def getLADKey(self): return self.configData.get("wad.key") def getLADName(self): return self.configData.get("wad.name") def getLADHostBase(self): return get_host_base_from_uri(self.getLADUri()) def getLADUri(self): return self.configData.get("wad.uri")