AzureMonitorAgent/ama_tst/modules/helpers.py (228 lines of code) (raw):
import os
import json
import platform
import subprocess
from errors import error_info
from error_codes import *
CONFIG_DIR = '/etc/opt/microsoft/azuremonitoragent/config-cache/configchunks'
METRICS_FILE = "/etc/opt/microsoft/azuremonitoragent/config-cache/metricCounters.json"
# backwards compatible input() function for Python 2 vs 3
try:
input = raw_input
except NameError:
pass
try:
FileNotFoundError
except NameError:
FileNotFoundError = IOError
# backwards compatible devnull variable for Python 3.3 vs earlier
try:
DEVNULL = subprocess.DEVNULL
except:
DEVNULL = open(os.devnull)
general_info = dict()
def geninfo_lookup(key):
try:
val = general_info[key]
except KeyError:
return None
return val
def get_input(question, check_ans=None, no_fit=None):
if check_ans == None and no_fit == None:
return input(question)
answer = input(" {0}: ".format(question))
while (not check_ans(answer.lower())):
print("Unclear input. {0}".format(no_fit))
answer = input(" {0}: ".format(question))
return answer
def is_arc_installed():
"""
Check if this is an Arc machine
"""
# Using systemctl to check this since Arc only supports VMs that have systemd
check_arc = os.system('systemctl status himdsd 1>/dev/null 2>&1')
return check_arc == 0
def find_vm_bits():
cpu_info = subprocess.check_output(['lscpu'], universal_newlines=True)
cpu_opmodes = (cpu_info.split('\n'))[1]
cpu_bits = cpu_opmodes[-6:]
return cpu_bits
def find_vm_distro():
"""
Finds the Linux Distribution this vm is running on.
"""
vm_dist = vm_id = vm_ver = None
parse_manually = False
try:
vm_dist, vm_ver, vm_id = platform.linux_distribution()
except AttributeError:
try:
vm_dist, vm_ver, vm_id = platform.dist()
except AttributeError:
# Falling back to /etc/os-release distribution parsing
pass
# Some python versions *IF BUILT LOCALLY* (ex 3.5) give string responses (ex. 'bullseye/sid') to platform.dist() function
# This causes exception in the method below. Thus adding a check to switch to manual parsing in this case
try:
temp_vm_ver = int(vm_ver.split('.')[0])
except:
parse_manually = True
if (not vm_dist and not vm_ver) or parse_manually: # SLES 15 and others
try:
with open('/etc/os-release', 'r') as fp:
for line in fp:
if line.startswith('ID='):
vm_dist = line.split('=')[1]
vm_dist = vm_dist.split('-')[0]
vm_dist = vm_dist.replace('\"', '').replace('\n', '')
vm_dist = vm_dist.lower()
elif line.startswith('VERSION_ID='):
vm_ver = line.split('=')[1]
vm_ver = vm_ver.replace('\"', '').replace('\n', '')
vm_ver = vm_ver.lower()
except (FileNotFoundError, AttributeError) as e: # indeterminate OS
return (None, None, e)
return (vm_dist, vm_ver, None)
def find_package_manager():
global general_info
"""
Checks which package manager is on the system
"""
pkg_manager = ""
# check if debian system
if (os.path.isfile("/etc/debian_version")):
try:
subprocess.check_output("command -v dpkg", shell=True)
pkg_manager = "dpkg"
except subprocess.CalledProcessError:
pass
# check if redhat system
elif (os.path.isfile("/etc/redhat_version")):
try:
subprocess.check_output("command -v rpm", shell=True)
pkg_manager = "rpm"
except subprocess.CalledProcessError:
pass
# likely SUSE or modified VM, just check dpkg and rpm
if (pkg_manager == ""):
try:
subprocess.check_output("command -v dpkg", shell=True)
pkg_manager = "dpkg"
except subprocess.CalledProcessError:
try:
subprocess.check_output("command -v rpm", shell=True)
pkg_manager = "rpm"
except subprocess.CalledProcessError:
pass
general_info['PKG_MANAGER'] = pkg_manager
return pkg_manager
def get_package_version(pkg):
pkg_mngr = geninfo_lookup('PKG_MANAGER')
# dpkg
if (pkg_mngr == 'dpkg'):
return get_dpkg_pkg_version(pkg)
# rpm
elif (pkg_mngr == 'rpm'):
return get_rpm_pkg_version(pkg)
else:
return (None, None)
# Package Info
def get_dpkg_pkg_version(pkg):
try:
dpkg_info = subprocess.check_output(['dpkg', '-s', pkg], universal_newlines=True,\
stderr=subprocess.STDOUT)
dpkg_lines = dpkg_info.split('\n')
for line in dpkg_lines:
if (line.startswith('Package: ') and not line.endswith(pkg)):
# wrong package
return (None, None)
if (line.startswith('Status: ') and not line.endswith('installed')):
# not properly installed
return (None, None)
if (line.startswith('Version: ')):
version = (line.split())[-1]
return (version, None)
return (None, None)
except subprocess.CalledProcessError as e:
return (None, e.output)
def get_rpm_pkg_version(pkg):
try:
rpm_info = subprocess.check_output(['rpm', '-qi', pkg], universal_newlines=True,\
stderr=subprocess.STDOUT)
if ("package {0} is not installed".format(pkg) in rpm_info):
# didn't find package
return (None, None)
rpm_lines = rpm_info.split('\n')
for line in rpm_lines:
parsed_line = line.split()
if (parsed_line[0] == 'Name'):
# ['Name', ':', name]
name = parsed_line[2]
if (name != pkg):
# wrong package
return (None, None)
if (parsed_line[0] == 'Version'):
# ['Version', ':', version]
version = parsed_line[2]
return (version, None)
return (None, None)
except subprocess.CalledProcessError as e:
return (None, e.output)
def find_ama_version():
"""
Gets a list of all AMA versions installed on the VM
"""
try:
config_dirs = filter((lambda x : x.startswith("Microsoft.Azure.Monitor.AzureMonitorLinuxAgent-")), os.listdir("/var/lib/waagent"))
ama_vers = list(map((lambda x : (x.split('-'))[-1]), config_dirs))
except FileNotFoundError as e:
return (None, e)
return (ama_vers, None)
def check_ama_installed(ama_vers):
"""
Checks to verify AMA is installed and only has one version installed at a time
"""
ama_exists = ((ama_vers != None) and (len(ama_vers) > 0))
ama_unique = (ama_exists and (len(ama_vers) == 1))
return (ama_exists, ama_unique)
def run_cmd_output(cmd):
"""
Common logic to run any command and check/get its output for further use
"""
try:
out = subprocess.check_output(cmd, shell=True, universal_newlines=True, stderr=subprocess.STDOUT)
return out
except subprocess.CalledProcessError as e:
return (e.output)
def find_dcr_workspace():
global general_info
if 'DCR_WORKSPACE_ID' in general_info and 'DCR_REGION' in general_info:
return (general_info['DCR_WORKSPACE_ID'], general_info['DCR_REGION'], None)
dcr_workspace = set()
dcr_region = set()
me_region = set()
general_info['URL_SUFFIX'] = '.com'
try:
for file in os.listdir(CONFIG_DIR):
file_path = CONFIG_DIR + "/" + file
with open(file_path) as f:
result = json.load(f)
channels = result['channels']
for channel in channels:
if channel['protocol'] == 'ods':
# parse dcr workspace id
endpoint_url = channel['endpoint']
workspace_id = endpoint_url.split('https://')[1].split('.ods')[0]
dcr_workspace.add(workspace_id)
# parse dcr region
token_endpoint_uri = channel['tokenEndpointUri']
region = token_endpoint_uri.split('Location=')[1].split('&')[0]
dcr_region.add(region)
# parse url suffix
if '.us' in endpoint_url:
general_info['URL_SUFFIX'] = '.us'
if '.cn' in endpoint_url:
general_info['URL_SUFFIX'] = '.cn'
if channel['protocol'] == 'me':
# parse ME region
endpoint_url = channel['endpoint']
region = endpoint_url.split('https://')[1].split('.monitoring')[0]
me_region.add(region)
except Exception as e:
return (None, None, e)
general_info['DCR_WORKSPACE_ID'] = dcr_workspace
general_info['DCR_REGION'] = dcr_region
general_info['ME_REGION'] = me_region
return (dcr_workspace, dcr_region, None)
def find_dce():
global general_info
dce = set()
try:
for file in os.listdir(CONFIG_DIR):
file_path = CONFIG_DIR + "/" + file
with open(file_path) as f:
result = json.load(f)
channels = result['channels']
for channel in channels:
if channel['protocol'] == 'gig':
# parse dce logs ingestion endpoint
ingest_endpoint_url = channel['endpointUriTemplate']
ingest_endpoint = ingest_endpoint_url.split('https://')[1].split('/')[0]
dce.add(ingest_endpoint)
# parse dce configuration access endpoint
configuration_endpoint_url = channel['tokenEndpointUri']
configuration_endpoint = configuration_endpoint_url.split('https://')[1].split('/')[0]
dce.add(configuration_endpoint)
except Exception as e:
return (None, None, e)
general_info['DCE'] = dce
return (dce, None)
def is_metrics_configured():
global general_info
if 'metrics' in general_info:
return general_info['metrics']
with open(METRICS_FILE) as f:
output = f.read(2)
if output != '[]':
general_info['metrics'] = True
else:
general_info['metrics'] = False
return general_info['metrics']