in AzureMonitorAgent/agent.py [0:0]
def metrics_watcher(hutil_error, hutil_log):
"""
Watcher thread to monitor metric configuration changes and to take action on them
"""
# Check every 30 seconds
sleepTime = 30
# Retrieve managed identity info that may be needed for token retrieval
identifier_name, identifier_value, error_msg = get_managed_identity()
if error_msg:
hutil_error('Failed to determine managed identity settings; MSI token retreival will rely on default identity, if any. {0}.'.format(error_msg))
if identifier_name and identifier_value:
managed_identity_str = "uai#{0}#{1}".format(identifier_name, identifier_value)
else:
managed_identity_str = "sai"
# Sleep before starting the monitoring
time.sleep(sleepTime)
last_crc = None
last_crc_fluent = None
me_msi_token_expiry_epoch = None
while True:
try:
# update fluent config for fluent port if needed
fluent_port = ''
if os.path.isfile(AMAFluentPortFilePath):
f = open(AMAFluentPortFilePath, "r")
fluent_port = f.read()
f.close()
if fluent_port != '' and os.path.isfile(FluentCfgPath):
portSetting = " Port " + fluent_port + "\n"
defaultPortSetting = 'Port'
portUpdated = True
with open(FluentCfgPath, 'r') as f:
for line in f:
found = re.search(r'^\s{0,}Port\s{1,}' + fluent_port + '$', line)
if found:
portUpdated = False
if portUpdated == True:
with contextlib.closing(fileinput.FileInput(FluentCfgPath, inplace=True, backup='.bak')) as file:
for line in file:
if defaultPortSetting in line:
print(portSetting, end='')
else:
print(line, end='')
os.chmod(FluentCfgPath, stat.S_IRGRP | stat.S_IRUSR | stat.S_IWUSR | stat.S_IROTH)
# add SELinux rules if needed
if os.path.exists('/etc/selinux/config') and fluent_port != '':
sedisabled, _ = run_command_and_log('getenforce | grep -i "Disabled"',log_cmd=False, log_output=False)
if sedisabled != 0:
check_semanage, _ = run_command_and_log("which semanage",log_cmd=False, log_output=False)
if check_semanage == 0:
fluentPortEnabled, _ = run_command_and_log('grep -Rnw /var/lib/selinux -e http_port_t | grep ' + fluent_port,log_cmd=False, log_output=False)
if fluentPortEnabled != 0:
# also check SELinux config paths for Oracle/RH
fluentPortEnabled, _ = run_command_and_log('grep -Rnw /etc/selinux -e http_port_t | grep ' + fluent_port,log_cmd=False, log_output=False)
if fluentPortEnabled != 0:
# allow the fluent port in SELinux
run_command_and_log('semanage port -a -t http_port_t -p tcp ' + fluent_port,log_cmd=False, log_output=False)
if os.path.isfile(FluentCfgPath):
f = open(FluentCfgPath, "r")
data = f.read()
if (data != ''):
crc_fluent = hashlib.sha256(data.encode('utf-8')).hexdigest()
if (crc_fluent != last_crc_fluent):
restart_launcher()
last_crc_fluent = crc_fluent
if os.path.isfile(MdsdCounterJsonPath):
f = open(MdsdCounterJsonPath, "r")
data = f.read()
if (data != ''):
json_data = json.loads(data)
if len(json_data) == 0:
last_crc = hashlib.sha256(data.encode('utf-8')).hexdigest()
if telhandler.is_running(is_lad=False):
# Stop the telegraf and ME services
tel_out, tel_msg = telhandler.stop_telegraf_service(is_lad=False)
if tel_out:
hutil_log(tel_msg)
else:
hutil_error(tel_msg)
# Delete the telegraf and ME services
tel_rm_out, tel_rm_msg = telhandler.remove_telegraf_service(is_lad=False)
if tel_rm_out:
hutil_log(tel_rm_msg)
else:
hutil_error(tel_rm_msg)
if me_handler.is_running(is_lad=False):
me_out, me_msg = me_handler.stop_metrics_service(is_lad=False)
if me_out:
hutil_log(me_msg)
else:
hutil_error(me_msg)
me_rm_out, me_rm_msg = me_handler.remove_metrics_service(is_lad=False)
if me_rm_out:
hutil_log(me_rm_msg)
else:
hutil_error(me_rm_msg)
else:
crc = hashlib.sha256(data.encode('utf-8')).hexdigest()
if(crc != last_crc):
# Resetting the me_msi_token_expiry_epoch variable if we set up ME again.
me_msi_token_expiry_epoch = None
hutil_log("Start processing metric configuration")
hutil_log(data)
telegraf_config, telegraf_namespaces = telhandler.handle_config(
json_data,
"unix:///run/azuremonitoragent/mdm_influxdb.socket",
"unix:///run/azuremonitoragent/default_influx.socket",
is_lad=False)
me_service_template_path = os.getcwd() + "/services/metrics-extension.service"
if os.path.exists(me_service_template_path):
os.remove(me_service_template_path)
if is_feature_enabled("enableCMV2"):
copyfile(os.getcwd() + "/services/metrics-extension-otlp.service", me_service_template_path)
else:
copyfile(os.getcwd() + "/services/metrics-extension-cmv1.service", me_service_template_path)
me_handler.setup_me(is_lad=False, managed_identity=managed_identity_str, HUtilObj=HUtilObject)
start_telegraf_res, log_messages = telhandler.start_telegraf(is_lad=False)
if start_telegraf_res:
hutil_log("Successfully started metrics-sourcer.")
else:
hutil_error(log_messages)
start_metrics_out, log_messages = me_handler.start_metrics(is_lad=False, managed_identity=managed_identity_str)
if start_metrics_out:
hutil_log("Successfully started metrics-extension.")
else:
hutil_error(log_messages)
last_crc = crc
generate_token = False
me_token_path = os.path.join(os.getcwd(), "/config/metrics_configs/AuthToken-MSI.json")
if me_msi_token_expiry_epoch is None or me_msi_token_expiry_epoch == "":
if os.path.isfile(me_token_path):
with open(me_token_path, "r") as f:
authtoken_content = f.read()
if authtoken_content and "expires_on" in authtoken_content:
me_msi_token_expiry_epoch = authtoken_content["expires_on"]
else:
generate_token = True
else:
generate_token = True
if me_msi_token_expiry_epoch:
currentTime = datetime.datetime.now()
token_expiry_time = datetime.datetime.fromtimestamp(int(me_msi_token_expiry_epoch))
if token_expiry_time - currentTime < datetime.timedelta(minutes=30):
# The MSI Token will expire within 30 minutes. We need to refresh the token
generate_token = True
if generate_token:
generate_token = False
msi_token_generated, me_msi_token_expiry_epoch, log_messages = me_handler.generate_MSI_token(identifier_name, identifier_value, is_lad=False)
if msi_token_generated:
hutil_log("Successfully refreshed metrics-extension MSI Auth token.")
else:
hutil_error(log_messages)
telegraf_restart_retries = 0
me_restart_retries = 0
max_restart_retries = 10
# Check if telegraf is running, if not, then restart
if not telhandler.is_running(is_lad=False):
if telegraf_restart_retries < max_restart_retries:
telegraf_restart_retries += 1
hutil_log("Telegraf binary process is not running. Restarting telegraf now. Retry count - {0}".format(telegraf_restart_retries))
tel_out, tel_msg = telhandler.stop_telegraf_service(is_lad=False)
if tel_out:
hutil_log(tel_msg)
else:
hutil_error(tel_msg)
start_telegraf_res, log_messages = telhandler.start_telegraf(is_lad=False)
if start_telegraf_res:
hutil_log("Successfully started metrics-sourcer.")
else:
hutil_error(log_messages)
else:
hutil_error("Telegraf binary process is not running. Failed to restart after {0} retries. Please check telegraf.log".format(max_restart_retries))
else:
telegraf_restart_retries = 0
# Check if ME is running, if not, then restart
if not me_handler.is_running(is_lad=False):
if me_restart_retries < max_restart_retries:
me_restart_retries += 1
hutil_log("MetricsExtension binary process is not running. Restarting MetricsExtension now. Retry count - {0}".format(me_restart_retries))
me_out, me_msg = me_handler.stop_metrics_service(is_lad=False)
if me_out:
hutil_log(me_msg)
else:
hutil_error(me_msg)
start_metrics_out, log_messages = me_handler.start_metrics(is_lad=False, managed_identity=managed_identity_str)
if start_metrics_out:
hutil_log("Successfully started metrics-extension.")
else:
hutil_error(log_messages)
else:
hutil_error("MetricsExtension binary process is not running. Failed to restart after {0} retries. Please check /var/log/syslog for ME logs".format(max_restart_retries))
else:
me_restart_retries = 0
except IOError as e:
hutil_error('I/O error in setting up or monitoring metrics. Exception={0}'.format(e))
except Exception as e:
hutil_error('Error in setting up or monitoring metrics. Exception={0}'.format(e))
finally:
time.sleep(sleepTime)