advanced-monitoring/Worker/main.py (182 lines of code) (raw):

from urllib import request import requests import json import datetime import pandas from time import sleep import urllib3 import os import os.path from azure.mgmt.avs import AVSClient from azure.identity import DefaultAzureCredential from azure.identity import ChainedTokenCredential,ManagedIdentityCredential from azure.identity import AzureCliCredential import logging pid = "/tmp/nsx-stats.pid" logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) logger.propagate = False fh = logging.FileHandler("/var/log/nsx-stats.log", "w") fh.setLevel(logging.DEBUG) logger.addHandler(fh) keep_fds = [fh.stream.fileno()] class NSXTConnection: def __init__(self, nsxtUri,nsxtUsername, nsxtPassword): self.nsxtUri = nsxtUri self.nsxtUsername = nsxtUsername self.nsxtPassword = nsxtPassword self.nsxtPolicyUri = nsxtUri+ "/policy/api/v1" class NSXTTier0Interface: def __init__(self, path, edge_path,T0): self.path = path self.edge_path = edge_path self.name = path[path.rfind("/")+1:] self.NSXTier0 = T0 self.connection= T0.connection def getInterfaceStats(self): return _getInterfaceStats(self) class NSXTTier0: def __init__(self, path, name, connection): self.path = path self.name = name self.connection = connection self.local_services = _getT0LocaleServices(self.connection,self.path) self.interfaces = _getT0Interfaces(self) def getInterfacesStats(self): interfaceStats = [] for interface in self.interfaces: interfaceStats.append(interface.getInterfaceStats()) return pandas.concat(interfaceStats) class NSXEdgeNode: def __init__(self, id, display_name, connection): self.id = id self.display_name = display_name self.connection = connection def getCPUStats(self): nodecpu = json.loads(_getAPIResults(nsxtConnection=self.connection,uri= "/api/v1/transport-nodes/"+self.id+"/node/services/dataplane/cpu-stats",policy=False)) df = pandas.json_normalize(nodecpu, record_path=['cores']) df.columns = df.columns.str.replace('/','_',regex=False) df['precise_timestamp'] = datetime.datetime.now(tz=datetime.timezone.utc) df['t0_name'] = self.display_name return df def _getT0Interfaces(NSXTTier0): interfaces = [] results = json.loads(_getAPIResults(nsxtConnection=NSXTTier0.connection,uri= NSXTTier0.local_services+"/interfaces")) for tier0interface in results['results']: interfaces.append(NSXTTier0Interface(path = tier0interface['path'], edge_path=tier0interface['edge_path'],T0= NSXTTier0)) return interfaces def _getT0(nsxtConnection, id): results = json.loads(_getAPIResults(nsxtConnection=nsxtConnection, uri="/infra/tier-0s/"+id)) return NSXTTier0(path = results['path'], name = results['display_name'], connection=nsxtConnection) def getT0s(nsxtConnection): T0s = [] results = json.loads(_getAPIResults(nsxtConnection=nsxtConnection, uri="/infra/tier-0s")) for t0 in results['results']: T0s.append(_getT0(nsxtConnection=nsxtConnection, id=t0['display_name'])) return T0s def _getT0LocaleServices(nsxtConnection,path): tier0locales = json.loads(_getAPIResults(nsxtConnection=nsxtConnection, uri=path+"/locale-services")) return tier0locales['results'][0]['path'] def _getInterfaceStats(tier0interface): statsuri = tier0interface.path+"/statistics?enforcement_point_path=/infra/sites/default/enforcement-points/default&edge_path="+tier0interface.edge_path tier0interfacestats = json.loads(_getAPIResults(nsxtConnection=tier0interface.connection, uri=statsuri)) df = pandas.json_normalize(tier0interfacestats, record_path=['per_node_statistics']) df.columns = df.columns.str.replace('.','_',regex=False) df['precise_timestamp'] = datetime.datetime.fromtimestamp(int(tier0interfacestats['per_node_statistics'][0]['last_update_timestamp'])/1000,tz=datetime.timezone.utc) df['t0_name'] = tier0interface.NSXTier0.name df['t0_interface'] = tier0interface.name return(df) def getEdgeNodes(nsxtConnection): nodes = [] results = json.loads(_getAPIResults(nsxtConnection=nsxtConnection, uri="/api/v1/transport-nodes?node_types=EdgeNode", policy=False))['results'] for node in results: nodes.append(NSXEdgeNode(id = node['node_id'],display_name = node['display_name'], connection=nsxtConnection)) return nodes def getCpuStats(nodes): frames = [] for node in nodes: frames.append(node.getCPUStats()) return pandas.concat(frames) def _getAPIResults(nsxtConnection, uri,json_body=None, policy = True): if (policy == True): r = requests.request(method="GET",url=nsxtConnection.nsxtPolicyUri+uri ,json=json_body,auth = requests.auth.HTTPBasicAuth (nsxtConnection.nsxtUsername, nsxtConnection.nsxtPassword), verify=False) else: r = requests.request(method="GET",url=nsxtConnection.nsxtUri+uri ,json=json_body,auth = requests.auth.HTTPBasicAuth (nsxtConnection.nsxtUsername, nsxtConnection.nsxtPassword), verify=False) if r.status_code == "200": return str(r.content).replace("\\n","")[2:-1] return r.content def main(): urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) #set output files: interface_csv = "./interface.csv" cpu_csv = "./cpu.csv" #Get Identity try: if os.environ['local'] == "True": credential = AzureCliCredential() except: try: MSI_credential = ManagedIdentityCredential(client_id=os.environ['client_id']) except: MSI_credential = DefaultAzureCredential() credential = ChainedTokenCredential(MSI_credential) #collect cloud info resource_id = os.environ['AVS_CLOUD_ID'] subscription_id = resource_id[15:resource_id[15:].find("/")+15] avs_client = AVSClient(credential, subscription_id) resource_group_name = resource_id[resource_id.find("resourceGroups/")+15:resource_id.find("/",resource_id.find("resourceGroups/")+15)] private_cloud_name = resource_id[resource_id.find("privateClouds/")+14:] print("Subscription Id: {}\r\nResource Group Name: {}\r\nCloud Name: {}".format(subscription_id,resource_group_name,private_cloud_name)) logger.debug("Subscription Id: {}\r\nResource Group Name: {}\r\nCloud Name: {}".format(subscription_id,resource_group_name,private_cloud_name)) #get cloud object cloud = avs_client.private_clouds.get(resource_group_name=resource_group_name,private_cloud_name=private_cloud_name) #colllect more info region_id = cloud.location nsxUri= cloud.endpoints.nsxt_manager[:-1] cloud_credentials = avs_client.private_clouds.list_admin_credentials(resource_group_name, cloud.name) #set env for telegraf os.environ["VCSA_URI"] = cloud.endpoints.vcsa os.environ["VCSA_USER"] = cloud_credentials.vcenter_username os.environ["VCSA_PASS"] = cloud_credentials.vcenter_password os.environ["REGION"] = region_id os.system("systemctl stop telegraf") #set telegraf vars os.system('systemctl import-environment VCSA_URI') os.system('systemctl import-environment VCSA_USER') os.system('systemctl import-environment VCSA_PASS') os.system('systemctl import-environment REGION') os.system('systemctl import-environment AVS_CLOUD_ID') sleep(10) os.system("systemctl start telegraf") #connect to nsx-t nsxtConnection = NSXTConnection(nsxtUri=nsxUri, nsxtUsername=cloud_credentials.nsxt_username, nsxtPassword=cloud_credentials.nsxt_password) ### Get T0s Interfaces ### nsxtT0 = getT0s(nsxtConnection=nsxtConnection)[0] ### Get EVM Transport Nodes ### nodes = getEdgeNodes(nsxtConnection=nsxtConnection) count=0 #main loop while True: try: # get stats interfacestats = nsxtT0.getInterfacesStats() cpustats = getCpuStats(nodes=nodes) #check to make sure old dataframe exists if (count == 1): #create new frame for delta values delta_frame = pandas.DataFrame(columns= interfacestats.columns) #loop through rows for index, row in interfacestats.iterrows(): #find coresponding row in old df2row = interfacestatsold.loc[interfacestatsold['t0_interface']==row['t0_interface']] #add new row to delta delta_frame = delta_frame.append(pandas.Series(dtype='float64'),ignore_index=True) #loop through cols for col_name, value in row.items(): #try to set numerical value. on error assum string try: elapsed_s = (int(df2row['last_update_timestamp'])-int(row['last_update_timestamp']))/1000 delta_frame.loc[len(delta_frame.index)-1][col_name]=(int(df2row[col_name])-int(row[col_name]))/elapsed_s except: delta_frame.loc[len(delta_frame.index)-1][col_name]=value #write to disk with header if new append if exists if os.path.isfile(interface_csv): delta_frame.to_csv(interface_csv, index=False, mode="a",header=False) else: delta_frame.to_csv(interface_csv, index=False, mode="a",header=True) if os.path.isfile(cpu_csv): cpustats.to_csv(cpu_csv, index=False,mode="a",header=False) else: cpustats.to_csv(cpu_csv, index=False,mode="a",header=True) #make copy of frame for next lop interfacestatsold = interfacestats.copy(deep=True) else: #make old frame if not exiting interfacestatsold = interfacestats.copy(deep=True) except Exception as e: print(e) logger.debug(e) break #set 1 after first run count = 1 #sleep for 60 seconds sleep(60) return if __name__ == '__main__': main()