AzureMonitorAgent/ama_tst/modules/connect/check_imds.py (65 lines of code) (raw):
import subprocess
import json
from error_codes import *
from errors import error_info
from helpers import general_info, geninfo_lookup, is_arc_installed
METADATA_CMD = 'curl -s -H Metadata:true --noproxy "*" "http://{0}/metadata/instance/compute?api-version=2020-06-01"'
AZURE_IP = "169.254.169.254"
ARC_IP = "127.0.0.1:40342"
AZURE_TOKEN_CMD = "curl 'http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=https%3A%2F%2Fmanagement.azure.com%2F' -H Metadata:true -s"
ARC_TOKEN_CMD = 'ChallengeTokenPath=$(curl -s -D - -H Metadata:true "http://127.0.0.1:40342/metadata/identity/oauth2/token?api-version=2019-11-01&resource=https%3A%2F%2Fmanagement.azure.com"'\
'| grep Www-Authenticate | cut -d "=" -f 2 | tr -d "[:cntrl:]") ; ' \
'ChallengeToken=$(cat $ChallengeTokenPath) ; ' \
'curl -s -H Metadata:true -H "Authorization: Basic $ChallengeToken" "http://127.0.0.1:40342/metadata/identity/oauth2/token?api-version=2019-11-01&resource=https%3A%2F%2Fmanagement.azure.com"'
def check_metadata():
global general_info
type = "Azure"
if is_arc_installed():
command = METADATA_CMD.format(ARC_IP)
type = "Hybrid"
else:
command = METADATA_CMD.format(AZURE_IP)
try:
output = subprocess.check_output(command, shell=True,\
stderr=subprocess.STDOUT, universal_newlines=True)
output_json = json.loads(output)
attributes = ['azEnvironment', 'resourceId', 'location']
for attr in attributes:
if not attr in output_json:
error_info.append((type, command, output))
return ERR_IMDS_METADATA
else:
attr_result = output_json[attr]
general_info[attr] = attr_result
except Exception as e:
error_info.append((type, command, e))
return ERR_IMDS_METADATA
return NO_ERROR
def check_token():
if is_arc_installed():
command = ARC_TOKEN_CMD
else:
command = AZURE_TOKEN_CMD
try:
# check AMA use UAI
managed_identity = geninfo_lookup('MANAGED_IDENTITY')
if not managed_identity == None:
managed_identity = managed_identity.replace('mi_res_id#', 'mi_res_id=')
command = command.replace('token?', 'token?{0}&'.format(managed_identity))
output = subprocess.check_output(command, shell=True,\
stderr=subprocess.STDOUT, universal_newlines=True)
output_json = json.loads(output)
if not 'access_token' in output_json:
error_info.append((command, output))
return ERR_ACCESS_TOKEN
except Exception as e:
error_info.append((command, e))
return ERR_ACCESS_TOKEN
return NO_ERROR
def check_imds_api():
# check metadata
checked_metadata = check_metadata()
if not checked_metadata == NO_ERROR:
return checked_metadata
# check access token
checked_token = check_token()
if not checked_token == NO_ERROR:
return checked_token
return NO_ERROR