vmassist/linux/vmassist.py (629 lines of code) (raw):
# Azure/azure-support-scripts
#
# Copyright (c) Microsoft Corporation
#
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this
# software and associated documentation files (the ""Software""), to deal in the Software
# without restriction, including without limitation the rights to use, copy, modify, merge,
# publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons
# to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
# NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import argparse
import os
import sys
import socket
import requests
import logging
import subprocess
import re
# for os-release (initially)
import csv
import pathlib
# network checking
import socket
import json
# For talking to the wire server and decoding responses
import http.client
from xml.etree import ElementTree
### COMMAND LINE ARGUMENT HANDLING
parser = argparse.ArgumentParser(
description="stuff"
)
parser.add_argument('-b', '--bash', required=True, type=str)
parser.add_argument('-r', '--report', action='store_true') # this is just to 'catch' a bash-side parameter, we don't use it
parser.add_argument('-d', '--debug', action='store_true')
parser.add_argument('-v', '--verbose', action='count', default=0)
parser.add_argument('-l', '--log', type=str, required=False, default='/var/log/azure/'+os.path.basename(__file__)+'.log')
parser.add_argument('-t', '--noterm', action='store_true') # mainly used for coloring output
args=parser.parse_args()
# TODO: implement using verbosity level
if ( args.debug ):
if ( args.verbose == 0 ):
args.verbose = 1
# example bash value:
# bash="DISTRO=debian|SERVICE=walinuxagent.service|UNIT=active|PY=/usr/bin/python3.8|PYCOUNT=1|PYREQ=loaded|PYALA=loaded"
bashArgs = dict(inStr.split('=') for inStr in args.bash.split("|"))
# any value can be extracted with
# bashArgs.get('NAME', "DefaultString")
# ex:
# bashArgs.get('PY',"N/A")
### END COMMAND LINE ARGUMENT HANDLING
### UTILS
#### UTIL VARs and OBJs
logger = logging.getLogger(__name__)
logging.basicConfig(format='%(asctime)s %(message)s', filename=args.log, level=logging.DEBUG)
# start logging as soon as possible
logger.info("Python script started:"+os.path.basename(__file__))
# add the 'to the console' flag to the logger
if ( args.verbose > 0 ):
logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))
logger.info("Debug on")
#### END UTIL VARS
#### UTIL FUNCTIONS
def colorPrint(color, strIn):
retVal=""
if ( args.noterm ):
retVal=strIn
else:
retVal=color+"{} \033[00m".format(strIn)
# print(color+"{} \033[00m".format(strIn))
return retVal
def cRed(strIn): return colorPrint("\033[91m", strIn)
def cGreen(strIn): return colorPrint("\033[92m", strIn)
def cYellow(strIn): return colorPrint("\033[93m", strIn)
def cBlack(strIn): return colorPrint("\033[98m", strIn)
def colorString(strIn, redVal="dead", greenVal="active", yellowVal="inactive"):
# force these into strs
strIn = str(strIn)
redVal = str(redVal)
greenVal = str(greenVal)
yellowVal = str(yellowVal)
# ordered so that errors come first, then warnings and eventually "I guess it's OK"
if redVal.lower() in strIn.lower():
return cRed(strIn)
elif yellowVal.lower() in strIn.lower():
return cYellow(strIn)
elif greenVal.lower() in strIn.lower():
return cGreen(strIn)
else:
return cBlack(strIn)
#### END UTIL FUNCS
### END UTILS
### MAIN CODE
#### Global vars setup
fullPercent=90
wireIP="168.63.129.16"
imdsIP="169.254.169.254"
# debug percentage
if ( args.verbose > 0 ):
fullPercent=20
# parse out os-release and put the values into a dict
path = pathlib.Path("/etc/os-release")
with open(path) as stream:
reader = csv.reader(filter(lambda line: line.strip(), stream), delimiter="=")
os_release = dict(reader)
osrID=os_release.get("ID_LIKE", os_release.get("ID"))
osMajS,osMinS=os_release.get("VERSION_ID").split(".")
osMaj=int(osMajS)
osMin=int(osMinS)
# TODO: Add a family / major version check for 'supported' and "doesn't work" checks
# TODO: perhaps add a best-effort flag, wrap things that might not work in 'best effort' mode
# -- weird versions - OEL, Alma, Rocky
# holding dicts for all the different things we will valiate
bins={}
services={}
checks={}
findings={}
# took out the part to put some default findings in, delete them if we find something bad
#### END Global vars
#### Main logic functions
def validateBin(binPathIn):
# usage: pass in a binary to check, the following will be determined
# - absolute path (dereference links)
# - provided by what package
# - what repo provides the package
# - version for the package or binary if possible
# output object:
# load up os-release into a dict for later reference
logger.info("Validating " + binPathIn)
# we need to store the passed value in case of exception with the dereferenced path
binPath=binPathIn
realBin=os.path.realpath(binPath)
if ( binPath != realBin ):
logger.info(f"Link found: {binPath} points to {realBin}, verify outputs if this returns empty data")
binPath=realBin
thisBin={"exe":binPathIn}
if (osrID == "debian"):
noPkg=False # extra exception flag, using pure try/excepts is difficult to follow
try:
# Find what package owns the binary
thisBin["pkg"]=subprocess.check_output("dpkg -S " + binPath, shell=True, stderr=subprocess.DEVNULL).decode().strip().split(":")[0]
except:
logger.info(f"issue validating {binPath}, reverting to original path: {binPathIn}")
try:
thisBin["pkg"]=subprocess.check_output("dpkg -S " + binPathIn, shell=True, stderr=subprocess.DEVNULL).decode().strip().split(":")[0]
except subprocess.CalledProcessError as e:
logger.info(f"All attempts to validate {binPathIn} have failed. Likely a rogue file: {e.output}")
noPkg=True
if not noPkg:
# find what repository the package came from
try:
aptOut=subprocess.check_output("apt-cache show --no-all-versions " + thisBin["pkg"] , shell=True, stderr=subprocess.DEVNULL).decode().strip()
thisBin["repo"]=re.search("Origin.*",aptOut).group()
except subprocess.CalledProcessError as e:
# we didn't get a match, probably a manual install (dkpg) or installed from source
logger.info(f"package {thisBin['pkg']} does not appear to have come from a repository")
thisBin["repo"]="no repo"
else:
# binary not found or may be source installed (no pkg)
thisBin["pkg"]=f"no file or owning pkg for {binPathIn}"
thisBin["repo"]="n/a"
elif ( osrID == "fedora"):
try:
rpm=subprocess.check_output("rpm -q --whatprovides " + binPath, shell=True, stderr=subprocess.DEVNULL).decode().strip()
thisBin["pkg"]=rpm
try:
# expand on this to make the call to 'dnf'
#dnfOut=subprocess.check_output("dnf info " + rpm, shell=True, stderr=subprocess.DEVNULL).decode().strip()
result=subprocess.run(["dnf","info",rpm], stdout=subprocess.PIPE, stderr=subprocess.PIPE,check=True)
except subprocess.CalledProcessError as e:
# we didn't get a match, probably a manual install (rpm), built from source, or a general DNF failure
thisBin["repo"]=f"repo search failed: {e.stderr.decode()}"
else:
dnfOut=result.stdout.decode().strip()
# Repo line should look like "From repo : [reponame]" so clean it up
thisBin["repo"]=re.search("From repo.*",dnfOut).group().strip().split(":")[1].strip()
except subprocess.CalledProcessError as e:
thisBin["pkg"]=f"no file or owning pkg: {e.output}"
thisBin["repo"]="n/a"
elif ( osrID == "suse"):
try:
rpm=subprocess.check_output('rpm -q --queryformat %{NAME} --whatprovides ' + binPath, shell=True, stderr=subprocess.DEVNULL).decode()
thisBin["pkg"]=rpm
try:
# options:
zyppOut=subprocess.check_output("zypper --quiet --no-refresh info " + rpm, shell=True, stderr=subprocess.DEVNULL).decode().strip()
thisBin["repo"]=re.search("Repository.*",zyppOut).group().split(":")[1].strip()
except:
# we didn't get a match, probably a manual install (rpm) or from source
thisBin["repo"]="not from a repo"
except subprocess.CalledProcessError as e:
thisBin["pkg"]="no file or owning pkg: " + e
thisBin["repo"]="n/a"
elif ( osrID == "mariner" or osrID == "azurelinux"):
try:
rpm=subprocess.check_output('rpm -q --queryformat %{NAME} --whatprovides ' + binPath, shell=True).decode()
thisBin["pkg"]=rpm
try:
# options:
zyppOut=subprocess.check_output("tdnf --installed info " + rpm, shell=True).decode().strip()
thisBin["repo"]=re.search("Repo.*",zyppOut).group().split(":")[1].strip()
except:
# we didn't get a match, probably a manual install (rpm) or from source
thisBin["repo"]="not from a repo"
except subprocess.CalledProcessError as e:
thisBin["pkg"]="no file or owning pkg: " + e
thisBin["repo"]="n/a"
else:
print("Unable to determine OS family from os-release")
thisBin["pkg"]="packaging system unknown"
thisBin["repo"]="n/a"
logString = binPath + " owned by package '" + thisBin["pkg"] + "' from repo '" + thisBin["repo"] + "'"
logger.info(logString)
bins[binPathIn]=thisBin
def checkService(unitName, package=False):
# take in a unit file and check status, enabled, etc.
# output object:
logger.info("Service/Unit check " + unitName)
thisSvc={"svc":unitName}
unitStat=0 # default service status return, we'll set this based on the 'systemctl status' RC
thisSvc["status"]="undef" # this will get changed somewhere
# First off, let us check if the unit even exists
try:
throwawayVal=subprocess.check_output(f"systemctl status {unitName}", shell=True)
#0 program is running or service is OK <<= default value of unitStat
#1 program is dead and /var/run pid file exists
#2 program is dead and /var/lock lock file exists
#3 program is not running
#4 program or service status is unknown
#5-99 reserved for future LSB use
#100-149 reserved for distribution use
#150-199 reserved for application use
#200-254 reserved
except subprocess.CalledProcessError as sysctlErr:
# we will be referencing this return code later, assuming it's not 4 - see table above
unitStat=sysctlErr.returncode
if ( unitStat == 4 ):
thisSvc["status"]="nonExistantService"
else:
logger.info(f"Service {unitName} status returned unexpected value: {sysctlErr.output} with text: {sysctlErr.output}")
# Unit was determined to exist (not rc=4), so lets validate the service status and maybe some other files
if ( unitStat < 4 ):
# Process the configured, active and substate for the service. Active/Sub could be inactive(dead) in an interactive console
# This can be done from systemctl show [service] --property=[UnitFileState|ActiveState|SubState]
config=subprocess.check_output(f"systemctl show {unitName} --property=UnitFileState",shell=True).decode().strip().split("=")[1]
active=subprocess.check_output(f"systemctl show {unitName} --property=ActiveState",shell=True).decode().strip().split("=")[1]
sub=subprocess.check_output(f"systemctl show {unitName} --property=SubState",shell=True).decode().strip().split("=")[1]
thisSvc["config"]=config
# make the 'status' look like the output of `systemctl status`
thisSvc["status"]=f"{active}({sub})"
# more integrety checks based on digging into the files
thisSvc["path"]=subprocess.check_output(f"systemctl show {unitName} -p FragmentPath", shell=True).decode().strip().split("=")[1]
# Which python does the service call?
# # dive into the file in 'path' and logic out what python is being called for validations
# who owns it... maybe?
if ( package ):
# We need to process the owner and path of the unit if (package) was set by the caller
logger.info(f"Checking owners for unit: {unitName} using validateBins")
# No need to re-code all this, just call validateBin(binName)
validateBin(thisSvc["path"])
thisSvc["pkg"]=bins[thisSvc["path"]]['pkg']
thisSvc["repo"]=bins[thisSvc["path"]]['repo']
# get rid of this extra entry in bins caused by calling validateBins()
del bins[thisSvc["path"]]
else:
logger.info(f"package details for {unitName} not requested, skipping")
pass
else:
#set some defaults when the unit wasn't here
pass
unitFile=subprocess.check_output(f"systemctl show {unitName} -p FragmentPath", shell=True).decode().strip().split("=")[1]
logString = unitName + " unit file found at " + thisSvc["path"] + "owned by package '" + thisSvc["pkg"] + "from repo: " + thisSvc["repo"]
logger.info(logString)
services[unitName]=thisSvc
def checkHTTPURL(urlIn):
checkURL = urlIn
headers = {'Metadata': 'True'}
returnString=""
try:
r = requests.get(checkURL, headers=headers, timeout=5)
returnString=r.status_code
r.raise_for_status()
except requests.exceptions.HTTPError as errh:
returnString=f"Error:{r.status_code}"
except requests.exceptions.RetryError as errr:
returnString=f"MaxRetries"
except requests.exceptions.Timeout as errt:
returnString=f"Timeout"
except requests.exceptions.ConnectionError as errc:
returnString=f"ConnectErr"
except requests.exceptions.RequestException as err:
returnString=f"UnexpectedErr"
return returnString
def isOpen(ip, port):
# return true/false if the remote port is/isn't listening, only takes an IP, no DNS is done
# using connect_ex would give us the error code for analysis, but we're just going for true/false here
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(2)
try:
is_open = s.connect((ip, int(port))) == 0 # True if open, False if not
if is_open:
s.shutdown(socket.SHUT_RDWR)
return True
except Exception:
is_open = False
s.close()
return is_open
def getInterfaces():
# Get all interfaces present in the system except for loopback, return as a dict
# -- May have an issue with multiple VIPs on a NIC
ipOut = subprocess.run(['ip', '-j', 'address', 'show'], stdout=subprocess.PIPE)
intJSON = json.loads(ipOut.stdout.decode('utf-8'))
addresses = {}
for iface in intJSON:
iface_name = iface.get('ifname')
if iface_name != "lo":
addresses[iface_name] = {}
addresses[iface_name]['mac'] = iface.get('address')
for addr_info in iface.get('addr_info', []):
if addr_info.get('family') == 'inet': # Only IPv4 addresses
addresses[iface_name]['ip'] = addr_info.get('local')
return addresses
# TODO:: repackage this search function as a generic re: search, and call it with the defined re for MAC addresses
def searchDirForMAC(dirToSearch, returnDict, okMAC):
# return all MACs found defined in some config file in the passed in directory
# We'll add each file, and the MAC(s) found in there, to the passed in dict
# accept an 'ok' MAC definition, because it would be ok for cloud-init managed
# configs to have a mac defined - CI will reset the configs if the mac changes
# Define a MAC address regex pattern (e.g., 00:1A:2B:3C:4D:5E or 00-1A-2B-3C-4D-5E)
mac_pattern = re.compile(r'([0-9a-f]{2}(?::[0-9a-f]{2}){5})', re.IGNORECASE)
# Walk through all files in the directory
for root, _, files in os.walk(dirToSearch):
# loop through all files in the dirToSearch
for file_name in files:
file_path = os.path.join(root, file_name)
# Check if it's a regular file (skip links, sockets, pipes, etc.)
if os.path.isfile(file_path):
try:
with open(file_path, 'r') as file:
content = file.read()
# Find all MAC addresses in the file
mac_addresses = mac_pattern.findall(content)
# If MAC addresses are found, and not the passed in okMAC, add them to the dictionary
# mac_addresses as returned from findall will be an array of string(s)
if ( mac_addresses and (okMAC not in mac_addresses) ):
returnDict[file_path] = mac_addresses
except (UnicodeDecodeError, PermissionError):
# Skip files that can't be read due to encoding or permission issues
# just create the exception code block, but we won't be using it
pass
def searchDirForString(dirToSearch, returnDict, string):
for root, _, files in os.walk(dirToSearch):
# loop through all files in the dirToSearch
for file_name in files:
file_path = os.path.join(root, file_name)
# Check if it's a regular file (skip links, sockets, pipes, etc.)
if os.path.isfile(file_path):
try:
# Create a holding space for any/all matched text
defLines=""
# Open this file and search for the string
with open(file_path, 'r') as file:
for line_number, line in enumerate(file, start=1):
# store the line number and line in the return string
if string in line:
# after the first occurrance add a newline every time we find a line
if (defLines):
defLines = f"{defLines}\n"
defLines = f"{defLines}{line_number}: {line.strip()}"
# if we found anything create the file entry in the dict with all lines
if ( defLines ):
returnDict[file_path] = defLines
except (UnicodeDecodeError, PermissionError):
# Skip files that can't be read due to encoding or permission issues
# just create the exception code block, but we won't be using it
pass
#### END main logic funcs
#### START main processing flow
# ToDo list from bash logstring: (delete when completed)
# LOGSTRING="$LOGSTRING|SERVICE=$SERVICE"
# LOGSTRING="$LOGSTRING|PY=$PY"
# LOGSTRING="$LOGSTRING|PYVERS=$PYVERSION"
# LOGSTRING="$LOGSTRING|PYCOUNT=$PYCOUNT"
# LOGSTRING="$LOGSTRING|PYREQ=$PYREQ"
# LOGSTRING="$LOGSTRING|PYALA=$PYALA"
logger.info("args were "+str(parser.parse_args()))
# log anything we've determined above
logger.info(f"OS family determined as {osrID}")
logger.info(f"OS Major Version={osMaj}")
logger.info(f"OS Minor Version={osMin}")
osOld = False
osFamOK = True
if ( osrID == "fedora" ):
if ( osMaj < 8 ):
osOld = True
elif ( osrID == "suse" ):
if ( osMaj < 15 ):
osOld = True
elif ( osrID == "debian" ):
if ( osMaj < 20 ):
osOld = True
elif ( osrID == "azurelinux" ):
if ( osMaj < 3 ):
osOld = True
else:
osFamOK = False
if ( osOld ):
logger.warning(f"OS family detected as {osrID} with major version of {osMaj} - this OS is too old too be reliably tested")
findings['osSup']={'description': 'OS is Old', 'status': f"OS Family:{osrID} with Major Release:{osMaj} is too old to be reliably tested"}
if ( not osFamOK):
logger.warning(f"Unsupported OS family detected:{osrID}")
findings['osSup']={'description': 'OS family is minimally or completely untested', 'status': f"OS Family:{osrID}"}
# We'll use the 'bash' arguments from the bash wrapper to seed this script
waaServiceIn=bashArgs.get('SERVICE', "waagent.service") # this may differ per-distro, but offer a default
pythonIn=bashArgs.get('PY', "/usr/bin/python3")
waaBin=subprocess.check_output("which waagent", shell=True, stderr=subprocess.DEVNULL).decode().strip()
logger.info(f"using waagent location {waaBin}")
# look through the os.environ object for any mention of a variable with 'proxy' in the name
osEnv=dict(os.environ)
proxyVars = {key: osEnv[key] for key in osEnv if "proxy" in key.lower()}
# create a check and if needed a finding
if proxyVars:
logger.info(f"proxy definition found in env: {proxyVars}")
findings['proxy']={'description': 'ProxyCheck', 'status': f"Found proxy environment vars:\n{proxyVars}"}
checks['proxy']={"check":"proxy", "value":proxyVars}
else:
logger.info(f"No proxies found in env")
checks['proxy']={"check":"proxy", "value":"None Found"}
# Check services and binaries
checkService(waaServiceIn, package=True)
# Check SSHD, Debian based distros started naming it ssh and launching on connect, sometime before Ubuntu 24.04
# TODO: make this version dependent - ubuntu 24.04+ uses JIT activation of sshd
if ( osrID == "debian" ):
checkService("ssh.service", package=True)
else:
checkService("sshd.service", package=True)
validateBin(pythonIn)
# PoC for right now to show what we can do, also because changing SSL can cause problems for extensions talking outside wire/IMDS
validateBin("/usr/bin/openssl")
validateBin(waaBin) # just to create another easy-to-check test
# Lets pull the version out of the 'normal' --version output string, for manual comparisons
waaVerOut=subprocess.check_output(f"{waaBin} --version", shell=True, stderr=subprocess.DEVNULL).decode().strip().lower().split('\n')
# if the output changes format we'll have to recode this block
# expected output:
#['walinuxagent-2.7.0.6 running on redhat 8.10',
# 'python: 3.6.8',
# 'goal state agent: 2.7.0.6']
waaVer = "0.0.0.0"
waaGoalVer = "0.0.0.0"
for line in waaVerOut:
# process the version out of string #1 or #3 above - with an optional 4th v.v.v.v section since some versions only have 3
verSearch = re.search(r'\d+\.\d+\.\d+(\.\d+)?', line)
if ( verSearch ):
if "walinuxagent" in line:
waaVer = verSearch.group(0)
elif "goal" in line:
waaGoalVer = verSearch.group(0)
# log the check
checks["waaVersion"]={
'description': 'Agent component versions',
'check': 'waaVersion',
'value': f"WAA:{waaVer}, Goal:{waaGoalVer}",
'type': 'config'
}
logger.info(f"Found agent:{waaVer} and extension handler: {waaGoalVer}")
# if the versions match, it's a 'finding' - these will only match if autoUpg is false or the package is VERY new so likely from source
if waaVer == waaGoalVer:
logger.info(f"PA and Goal match version {waaVer} - this is probably bad!")
findings['waaVers']={'description': 'Agent/Goal versions', 'status': f"Agent version and goal state match = {waaVer} - this is unlikely"}
## turn service/bins checks into 'checks' and 'findings'
### Binaries
#### string for the console report
binReportString=""
for binName in bins:
checks[bins[binName]['exe']] = {'check': bins[binName]['exe'],
'description': f"Binary check of {bins[binName]['exe']}",
'value': f"Package:{bins[binName]['pkg']}, source:{bins[binName]['repo']}"
}
# check for alarms in the binaries and create findings as needed
# - is the path include questionable areas - local, home, opt - these aren't "normal"
if ( re.search("local", bins[binName]['exe']) or
re.search("opt", bins[binName]['exe']) or
re.search("home", bins[binName]['exe'])):
# this is bad, create a findings from this check
findings[f"bp:{bins[binName]['exe']}"]={
'description': f"binpath:{bins[binName]['exe']}",
'status': "Path includes questionable directories",
'type': "bin"
}
logger.warning(f"Checking path: {bins[binName]['exe']} found in a non-standard location")
binReportString+=f"{cYellow(bins[binName]['exe'])} => check location\n"
# - is the repository uncommon
repoBad=False
if osrID == "debian":
# check if the repository is expected, this should usually say "Origin: Ubuntu"
# We are blissfully ignoring *actual* Debian - which itself would be a cause for concern
if ( not re.search(r"Origin: Ubuntu", bins[binName]['repo'])):
repoBad=True
elif ( osrID == "fedora" or osrID == "azurelinux" ) :
# Check if the 'repo' field includes the error indicator 'fail', or check if the repository name
# is either @System or anaconda (initial install for RHEL or AL), or includes 'rhui' or 'azurelinux',
# or appstream - which is ok-ish
if ( re.search("fail", bins[binName]['repo'])
or not (re.search(r"@System", bins[binName]['repo']) or
re.search("anaconda", bins[binName]['repo']) or
re.search("rhui", bins[binName]['repo']) or
re.search("AppStream", bins[binName]['repo']) or
re.search("azurelinux", bins[binName]['repo'])
)):
repoBad=True
elif osrID == "suse":
# check if the repository includes 'SLE-Module' or 'SUSE'
if ( not re.search(r"SLE-Module", bins[binName]['repo'])):
repoBad=True
# all distro-specific checks finished, report if needed
if ( repoBad ):
findings[f"bs:{bins[binName]['exe']}"]={
'description': f"binsource:{bins[binName]['exe']}",
'status': f"Binary came from unusual source: {bins[binName]['repo']}",
'type': "bin"
}
logger.warning(f"Checking {bins[binName]['exe']} found to be sourced from the repo {bins[binName]['repo']}")
binReportString+=f"{bins[binName]['exe']} => {cRed(bins[binName]['repo'])} - verify repository\n"
if (len(binReportString) == 0 ):
binReportString=cGreen("-- No issues with checked binaries")
logger.info("No concerns found with binary checks")
### Services/Units
svcReportString=""
for svcName in services:
if ( not re.search("running", services[svcName]['status']) ):
findings[f"ss:{services[svcName]['svc']}"]={
'description': f"service:{services[svcName]['svc']}",
'status': f"Service not in 'running' state: {services[svcName]['status']}",
'type': "svc"
}
logger.warning(f"Checking {services[svcName]['svc']} found in state {services[svcName]['status']}")
svcReportString+=f"{services[svcName]['svc']} => {cRed(services[svcName]['status'])} - check logs\n"
if ( not re.search("enabled", services[svcName]['config']) ):
findings[f"sc:{services[svcName]['svc']}"]={
'description': f"service:{services[svcName]['svc']}",
'status': f"Service not enabled: {services[svcName]['config']}",
'type': "svc"
}
logger.warning(f"Checking {services[svcName]['svc']} not enabled: {services[svcName]['config']}")
svcReportString+=f"{services[svcName]['svc']} => {cRed(services[svcName]['config'])} - check config\n"
if (len(svcReportString) == 0 ):
svcReportString=cGreen("-- No issues with checked services")
logger.info("No concerns found with service checks")
## Early version report code
# print(f"Analysis of unit : {services[svcName]['svc']}:")
# print(f" Owning pkg : {services[svcName]['pkg']}" )
# print(f" Repo for pkg : {services[svcName]['repo']}" )
# print( " run state : "+colorString(services[svcName]['status'], redVal="dead", greenVal="active"))
# print( " config state : "+colorString(services[svcName]['config'], redVal="disabled", greenVal="enabled"))
# Connectivity checks
## Wire server
wireCheck=checkHTTPURL(f"http://{wireIP}/?comp=versions")
thisCheck={"check":"wire 80", "value":wireCheck}
checks['wire']=thisCheck
if wireCheck != 200:
findings['wire80']={
'description': 'WireServer:80',
'status': wireCheck,
'type': "conn"
}
logger.warning(f"Wire server port 80 check returned {wireCheck} - check connectivity")
else:
logger.info(f"Wire server port 80 check returned OK({wireCheck})")
# temp variable clean up, this shouldn't remove the item in the 'checks' dict, just the temp object
del(thisCheck)
## Wire server "extension" port
wireExt=isOpen(wireIP,32526)
thisCheck={"check":"wire 23526", "value":wireExt}
checks['wireExt']=thisCheck
if not wireExt :
findings['wire23526']={
'description': 'WireServer:32526',
'status': wireExt,
'type': "conn"
}
logger.warning(f"Wire server extension port (32526) test returned {wireExt} - check connectivity")
else:
logger.info(f"Wire server extension port (32526) returned OK({wireExt})")
# temp variable clean up, this shouldn't remove the item in the 'checks' dict, just the temp object
del(thisCheck)
## IMDS
imdsCheck=checkHTTPURL(f"http://{imdsIP}/metadata/instance?api-version=2021-02-01")
thisCheck={"check":"imds 443", "value":imdsCheck}
checks['imds']=thisCheck
if imdsCheck != 200:
findings['imds']={
'description': 'IMDS',
'status': imdsCheck,
'type': "conn"
}
logger.warning(f"IMDS port 80 check returned {imdsCheck} - check connectivity")
else:
logger.info(f"IMDS port 80 check returned OK({imdsCheck})")
# temp variable clean up, this shouldn't remove the item in the 'checks' dict, just the temp object
del(thisCheck)
# Secondary test for ext. handler version/auto upgrade
# if the wire port state is 200(OK), query the wireserver for the latest goalstate (ext. handler) and check against the current goal state
if wireCheck == 200:
try:
# We only use these modules in here - so far
import xml.etree.ElementTree as ET
from urllib.parse import urlparse
# get the best API version *from the wire server
endpoint="/?comp=versions"
conn = http.client.HTTPConnection(wireIP)
headers = {
"Accept": "application/xml", # Requesting XML response
"User-Agent": "VM assist" # Optional, helps identify the client
}
conn.request("GET", endpoint, headers=headers)
response = conn.getresponse()
xmlResp=response.read()
apiVers=ET.fromstring(xmlResp).find("./Preferred/Version").text
# Find the URLs for the different bits of the goal state
endpoint="/machine/?comp=goalstate"
headers = {
"x-ms-version": apiVers,
"Accept": "application/xml", # Requesting XML response
"User-Agent": "PythonTestHarness" # Optional, helps identify the client
}
conn.request("GET", endpoint, headers=headers)
response = conn.getresponse()
xmlResp=response.read().decode()
extConfURL=ET.fromstring(xmlResp).find("./Container/RoleInstanceList/RoleInstance/Configuration/ExtensionsConfig").text
parsedURL=urlparse(extConfURL)
endpoint = parsedURL.path + "?" + parsedURL.query
conn.request("GET", endpoint, headers=headers)
response = conn.getresponse()
xmlResp=response.read().decode()
wireGSVersion=ET.fromstring(xmlResp).find("./GuestAgentExtension/GAFamilies/GAFamily/Version").text
if wireGSVersion != waaGoalVer:
findings['waaUpgStat']={'status': f"not up to date - Local:{waaGoalVer} Wire:{wireGSVersion}", 'description':"GoalState version mismatch to wireserver"}
except:
findings['waaUpgStat']={'status': "failed during testing", 'description':f"GoalState version on VM ({waaGoalVer}) does not match wire server({wireGSVersion})"}
finally:
checks['waaUpgStat']={"check":"GoalVersion", "description":"Checking Goal State version against wire server", "value":wireGSVersion}
else:
# flag that we skipped wireserver capability checks due to failing connectivity checks
findings['waaUpgStat']={'status': "skipped", 'description':"Did not check GoalState version on wire server"}
# OS/config checks
## Agent config
waaConfigOut=subprocess.check_output(f"{waaBin} --show-configuration", shell=True, stderr=subprocess.DEVNULL).decode().strip().split('\n')
waaConfig={}
# put all output from the config command into a KVP
for line in waaConfigOut:
key, value = line.split('=', 1)
waaConfig[key.strip()] = value.strip()
checks['waaExt']={"check":"WAA Extension", "value":waaConfig['Extensions.Enabled']}
if ( checks['waaExt']['value'] != 'True' ):
findings['waaExt']={'status': checks['waaExt']['value'], 'description':"Extensions are disabled in WAA config"}
logger.warning(f"Extensions potentially disabled: {checks['waaExt']['value']}")
else:
logger.info(f"Extensions enabled in waagent config {checks['waaExt']['value']}")
checks['waaUpg']={"check":"WAA AutoUpgrade", "value":waaConfig['AutoUpdate.Enabled']}
if ( checks['waaUpg']['value'] != 'True' ):
findings['waaUpg']={'status': checks['waaUpg']['value'], 'description':"Agent extension handler auto-upgrade is disabled in WAA config"}
logger.warning(f"Agent(ext handler) auto-update possibly disabled: {checks['waaUpg']['value']}")
else:
logger.info(f"Agent(ext handler) auto-update enabled in waagent config {checks['waaUpg']['value']}")
# Checks against disks and objects
## results of disk space checks
### seed checks with a 'no problems' message, we'll reset it when we find one
checks['fullFS']={"check":"fullFS", "description": f"filesystem util over {fullPercent}%", "none":f"No filesystems over {fullPercent}% util"}
diskFind=""
## find the device 'id' for checking if the extension directory is 'noexec'
vlwaDev=os.stat("/var/lib/waagent").st_dev
mounts=[]
# only check these filesystem types ext4,xfs,vfat,btrfs,ext3
findmnt=subprocess.check_output("findmnt --evaluate -nb -o TARGET,SOURCE,FSTYPE,OPTIONS,USE% --pairs -t=ext2,ext3,ext4,btrfs,xfs,vfat", shell=True, stderr=subprocess.DEVNULL).decode().strip().split("\n")
for fm in findmnt:
pairs = fm.split()
dictTemp={}
for pair in pairs:
key, value = pair.split('=',1)
dictTemp[key] = value.strip('"%')
mounts.append(dictTemp)
# this was initially done in psutils:
# mounts = psutil.disk_partitions()
# but was found that certain distros do not include psutils in their marketplace images, so re-wrote with generic python code
for m in mounts:
logger.info(f"Checking {m['SOURCE']} mounted at {m['TARGET']}")
# the following hack brought to you by SLES, where USE% is instead USE_PCT
pcent=0
if ( 'USE%' in m ):
pcent = m['USE%']
elif ( 'USE_PCT' in m ):
pcent = m['USE_PCT']
if int(pcent) >= fullPercent:
logger.warning(f"Filesystem utilization for {m['TARGET']} is over {fullPercent}: {pcent}")
# delete the 'default empty set' wording in 'checks' for fullFS, because we found a disk over the util threshold
if 'none' in checks["fullFS"]:
checks['fullFS']={'check': 'fullFS', 'description':f'Look for filesystems utilized more than {fullPercent}','value':'see findings for details'}
findings['fullFS']={}
# Add each full filesystem to the list
if 'status' in findings['fullFS']:
findings['fullFS']['status'] = f"{findings['fullFS']['status']}, {m['TARGET']}:{pcent}"
else:
findings['fullFS']={'description': f"Filesystems over{fullPercent}",
'status': f"{m['TARGET']}:{pcent}",
'type':'os'
}
# check if this mount (m) is the one holding /var/lib/waagent, if so we will want to check to see if the mount options include 'noexec'
if ( os.stat(m['TARGET']).st_dev == vlwaDev ):
logger.info(f"Found /var/lib/waagent based in filesystem {m['TARGET']} on device {m['SOURCE']}, checking mount options")
# create the 'checks' data describing this
checks['noexec']={
'description': f"Checking mount options for noexec on {m['SOURCE']}",
'check': 'noexec',
'value': m['TARGET']
}
# add the 'findings' data if it's bad
if (re.search("noexec", m['OPTIONS'])):
# Found noexec so flag it
logger.error(f"mountpoint {m['TARGET']} mounted with 'noexec'")
findings['noexec']={
'description':"Found /var/lib/waagent with noexec bit set",
'status':True
}
## Networking
# Get a list of all the interfaces and addresses
ints=(getInterfaces())
# Since there's no reliable way to check whether eth0 is static or dhcp, look through the
# normal networking directories for the eth0 IP address.
# If we've found any files holding the IP currently on eth0, that's a problem
### Checks for defined MAC addresses or IPs - pertinent if someone hard coded configs
# set dummy addresses for the search
eth0MAC="de:ad:be:ef:4a:11"
eth0IP="128.0.128.255"
# If eth0 was found, store the MAC for checking for defined MAC addresses in files
if ( 'eth0' in ints ):
eth0MAC = ints['eth0']['mac']
eth0IP = ints['eth0']['ip']
logger.info(f"Found {eth0MAC} on eth0, using this for config checks")
else:
# if there is no eth0 defined, we're probably going to have some large issues with checks and possibly in
# the system state, so be sure to log it. Also create a 'finding'
logger.error(f"Could not find a definition for eth0 - is networking sound?")
findings['noETH0']={
'description':"Could not locate an active eth0",
'status': f"eth0 - MAC:{eth0MAC}|IP{eth0IP}",
'type': 'os'
}
filesWithIP={}
# we could check all of /etc, but that can be a lot and catch unrelated service configs, so look in the
# usual network dirs
searchDirForString("/etc/sysconfig", filesWithIP, eth0IP)
searchDirForString("/etc/netplan", filesWithIP, eth0IP)
if ( filesWithIP ):
checks['IPs']={"check":"Static IP addresses", "value":"IP found in files- see findings"}
fileString=""
for foundFile in filesWithIP:
# if the "second time through" add a ", " seperator
if (fileString):
fileString=f"{fileString}, "
fileString=f"{fileString}{foundFile}"
# create the 'findings' entry
findings['staticIP']={'description': 'eth0 IP found in files', 'status': fileString}
logger.warning(f"Found eth0 IP:{eth0IP} defined in a config file, could be static - check findings report")
else:
checks['IPs']={"check":"Static IP addresses", "value":"No IP addresses found in configs"}
logger.info(f"Did not find IP configured on eth0 listed in any config files")
filesWithMACs={}
# We can't search for /etc because certain SSL configs have "MAC looking strings", so check these two
# directories, which should cover all common distros
searchDirForMAC("/etc/sysconfig", filesWithMACs,eth0MAC)
searchDirForMAC("/etc/netplan", filesWithMACs,eth0MAC)
if ( filesWithMACs ):
checks['MACs']={"check":"MAC addresses", "value":"MACs found - see findings"}
fileString=""
for foundFile in filesWithMACs:
# if the "second time through" add a ", " seperator
if (fileString):
fileString=f"{fileString}, "
fileString=f"{fileString}{foundFile}=>{filesWithMACs[foundFile][0]}"
# create the 'findings' entry
findings['badMAC']={'description': 'MACs found', 'status': fileString}
logger.warning(f"Found eth0 MAC:{eth0MAC} defined in a config file - check findings report")
else:
checks['MACs']={"check":"MAC addresses", "value":"No MAC addresses found in configs"}
logger.info(f"Did not find MAC on eth0 listed in any config files")
# END ALL CHECKS
# START OUTPUT
print("------ vmassist.py results ------")
print("Please see https://aka.ms/vmassistlinux for guidance on the information in the above output")
print(f"OS family : {osrID}")
# things we will always report on:
## WAA service
### => services[waaServiceIn]
#rint(f"OS family : {osrID}")
print(f"Agent service : {services[waaServiceIn]['svc']}")
print(f"=> status : {colorString(services[waaServiceIn]['status'])}")
print(f"=> config state : {colorString(services[waaServiceIn]['config'], redVal='disabled', greenVal='enabled')}")
print(f"=> source pkg : {services[waaServiceIn]['pkg']}")
print(f"=> repository : {services[waaServiceIn]['repo']}")
print(f"Agent version from running {waaBin} --version")
print(f"=> Main version : {waaVer}")
print(f"=> Goal state : {colorString(waaGoalVer,redVal=waaVer,greenVal=wireGSVersion)}")
#checkService(waaServiceIn, package=True)
# => {'walinuxagent.service': {'svc': 'walinuxagent.service', 'status': 'active(running)', 'config': 'enabled', 'path': '/usr/lib/systemd/system/walinuxagent.service', 'pkg': 'walinuxagent', 'repo': 'Origin: Ubuntu'}}
print(f"Wire Server")
print(f" port 80 : {colorString(checks['wire']['value'], redVal='404', yellowVal='timeout', greenVal='200')}")
print(f" port 32526 : {colorString(checks['wireExt']['value'], redVal='false', greenVal='true')}")
print(f"IMDS : {colorString(checks['imds']['value'], redVal='404', yellowVal='timeout', greenVal='200')}")
# Always print out something about disk, use the default 'no problems' object, otherwise show what we found
if 'none' in checks["fullFS"]:
print(f"Disk util > {fullPercent}% : {checks['fullFS']['none']}")
else:
print(f"Disk util > {fullPercent}% : {findings['fullFS']['status']}")
# TODO: clean up and verify color on all core checks - wire server, waagent status
# TODO: optionally output all 'checks' objects
# Output the pre-determined binary findings
print("- Binary check results:")
print(binReportString)
print("- Service check results:")
print(svcReportString)
# TODO: parse findings list
print("- Findings from all checks:")
if ( findings ):
print(cYellow("-- All Findings (may duplicate Service and Binary checks) ---"))
for find in findings:
print(f"--- {findings[find]['description']} : {findings[find]['status']}")
print(cYellow("-- END Findings ---"))
else:
print(cGreen("-- No notable findings!"))
# TODO: add the core checks not covered in findings, bins, and services, to the logs
### Log the raw data - don't send to the console
logger.info("--- verbose output of data structures ---")
logger.info("----- Binary check data structure:")
logger.info(str(bins))
logger.info("----- Service checks data structure:")
logger.info(str(services))
logger.info("----- All \"checks\" data structure:")
logger.info(str(checks))
logger.info("----- All \"findings\" data structure:")
logger.info(str(findings))
logger.info("--- END data structures ---")
# # DEBUG
# # semi-debug, looks good for now until we get the checks and findings presentation built up
if ( args.verbose > 0 ):
print("--- Verbose binary check output")
for binName in bins:
print(f"Analysis of : {bins[binName]['exe']}:")
print(f" Owning pkg : {bins[binName]['pkg']}" )
print(f" Repo for pkg : {bins[binName]['repo']}" )
print("--- Verbose service check output")
for svcName in services:
print(f"Analysis of unit : {services[svcName]['svc']}:")
print(f" Owning pkg : {services[svcName]['pkg']}" )
print(f" Repo for pkg : {services[svcName]['repo']}" )
print( " run state : "+colorString(services[svcName]['status'], redVal="dead", greenVal="active"))
print( " config state : "+colorString(services[svcName]['config'], redVal="disabled", greenVal="enabled"))
print("--- END Verbose output")
# END DEBUG
print("------ END vmassist.py output ------")
logger.info("Python ended")
#if ( args.debug ):
# print("------------ DATA STRUCTURE DUMP ------------")
# # For development testing, These are the last pprint calls
# from pprint import pprint
# print("bins")
# pprint(bins)
# print("services")
# pprint(services)
# print("findings")
# pprint(findings)
# print("checks")
# pprint(checks)
# print("args")
# pprint(args)
# print("---------- END DATA STRUCTURE DUMP ----------")