in Diagnostic/diagnostic.py [0:0]
def start_mdsd(configurator):
"""
Start mdsd and monitor its activities. Report if it crashes or emits error logs.
:param configurator: A valid LadConfigAll object that was obtained by create_core_components_config().
This will be used for configuring rsyslog/syslog-ng/fluentd/in_syslog/out_mdsd components
:return: None
"""
# This must be done first, so that extension enable completion doesn't get delayed.
write_lad_pids_to_file(g_lad_pids_filepath, os.getpid())
# Need 'HeartBeat' instead of 'Daemon'
waagent_ext_event_type = wala_event_type_for_telemetry(g_ext_op_type)
# mdsd http proxy setting
proxy_config = get_mdsd_proxy_config(waagent.HttpProxyConfigString, g_ext_settings, hutil.log)
if proxy_config:
# Add MDSD_http_proxy to current environment. Child processes will inherit its value.
os.environ['MDSD_http_proxy'] = proxy_config
copy_env = os.environ.copy()
# Add MDSD_CONFIG_DIR as an env variable since new mdsd master branch LAD doesnt create this dir
mdsd_config_cache_dir = os.path.join(g_ext_dir, "config")
copy_env["MDSD_CONFIG_DIR"] = mdsd_config_cache_dir
# We then validate the mdsd config and proceed only when it succeeds.
xml_file = os.path.join(g_ext_dir, 'xmlCfg.xml')
tmp_env_dict = {} # Need to get the additionally needed env vars (SSL_CERT_*) for this mdsd run as well...
g_dist_config.extend_environment(tmp_env_dict)
added_env_str = ' '.join('{0}={1}'.format(k, tmp_env_dict[k]) for k in tmp_env_dict)
config_validate_cmd = '{0}{1}{2} -v -c {3} -r {4}'.format(added_env_str, ' ' if added_env_str else '',
g_mdsd_bin_path, xml_file, g_ext_dir)
config_validate_cmd_status, config_validate_cmd_msg = RunGetOutput(config_validate_cmd)
if config_validate_cmd_status is not 0:
# Invalid config. Log error and report success.
g_lad_log_helper.log_and_report_invalid_mdsd_cfg(g_ext_op_type,
config_validate_cmd_msg, read_file_to_string(xml_file))
return
# Start OMI if it's not running.
# This shouldn't happen, but this measure is put in place just in case (e.g., Ubuntu 16.04 systemd).
# Don't check if starting succeeded, as it'll be done in the loop below anyway.
omi_running = RunGetOutput("/opt/omi/bin/service_control is-running", should_log=False)[0] is 1
if not omi_running:
hutil.log("OMI is not running. Restarting it.")
RunGetOutput("/opt/omi/bin/service_control restart")
log_dir = hutil.get_log_dir()
err_file_path = os.path.join(log_dir, 'mdsd.err')
info_file_path = os.path.join(log_dir, 'mdsd.info')
warn_file_path = os.path.join(log_dir, 'mdsd.warn')
qos_file_path = os.path.join(log_dir, 'mdsd.qos')
# Need to provide EH events and Rsyslog spool path since the new mdsd master branch LAD doesnt create the directory needed
eh_spool_path = os.path.join(log_dir, 'eh')
update_selinux_settings_for_rsyslogomazuremds(RunGetOutput, g_ext_dir)
mdsd_stdout_redirect_path = os.path.join(g_ext_dir, "mdsd.log")
mdsd_stdout_stream = None
g_dist_config.extend_environment(copy_env)
# Now prepare actual mdsd cmdline.
command = '{0} -A -C -c {1} -R -r {2} -e {3} -w {4} -q {8} -S {7} -o {5}{6}'.format(
g_mdsd_bin_path,
xml_file,
g_mdsd_role_name,
err_file_path,
warn_file_path,
info_file_path,
g_ext_settings.get_mdsd_trace_option(),
eh_spool_path,
qos_file_path).split(" ")
try:
start_watcher_thread()
num_quick_consecutive_crashes = 0
mdsd_crash_msg = ''
while num_quick_consecutive_crashes < 3: # We consider only quick & consecutive crashes for retries
RunGetOutput('rm -f ' + g_mdsd_file_resources_prefix + '.pidport') # Must delete any existing port num file
mdsd_stdout_stream = open(mdsd_stdout_redirect_path, "w")
hutil.log("Start mdsd " + str(command))
mdsd = subprocess.Popen(command,
cwd=g_ext_dir,
stdout=mdsd_stdout_stream,
stderr=mdsd_stdout_stream,
env=copy_env)
write_lad_pids_to_file(g_lad_pids_filepath, os.getpid(), mdsd.pid)
last_mdsd_start_time = datetime.datetime.now()
last_error_time = last_mdsd_start_time
omi_installed = True # Remembers if OMI is installed at each iteration
telegraf_restart_retries = 0
me_restart_retries = 0
max_restart_retries = 10
# Continuously monitors mdsd process
while True:
time.sleep(30)
if " ".join(get_lad_pids()).find(str(mdsd.pid)) < 0 and len(get_lad_pids()) >= 2:
mdsd.kill()
hutil.log("Another process is started, now exit")
return
if mdsd.poll() is not None: # if mdsd has terminated
time.sleep(60)
mdsd_stdout_stream.flush()
break
# mdsd is now up for at least 30 seconds. Do some monitoring activities.
# 1. Mitigate if memory leak is suspected.
mdsd_memory_leak_suspected, mdsd_memory_usage_in_KB = check_suspected_memory_leak(mdsd.pid, hutil.error)
if mdsd_memory_leak_suspected:
g_lad_log_helper.log_suspected_memory_leak_and_kill_mdsd(mdsd_memory_usage_in_KB, mdsd,
waagent_ext_event_type)
break
# 2. Restart OMI if it crashed (Issue #128)
omi_installed = restart_omi_if_crashed(omi_installed, mdsd)
# 3. Check if there's any new logs in mdsd.err and report
last_error_time = report_new_mdsd_errors(err_file_path, last_error_time)
# 4. Check if telegraf is running, if not, then restart
if enable_telegraf and not telhandler.is_running(is_lad=True):
if telegraf_restart_retries < max_restart_retries:
telegraf_restart_retries += 1
hutil.log("Telegraf binary process is not running. Restarting telegraf now. Retry count - {0}".format(telegraf_restart_retries))
tel_out, tel_msg = telhandler.stop_telegraf_service(is_lad=True)
if tel_out:
hutil.log(tel_msg)
else:
hutil.error(tel_msg)
start_telegraf_res, log_messages = telhandler.start_telegraf(is_lad=True)
if start_telegraf_res:
hutil.log("Successfully started metrics-sourcer.")
else:
hutil.error(log_messages)
else:
hutil.error("Telegraf binary process is not running. Failed to restart after {0} retries. Please check telegraf.log at {1}".format(max_restart_retries, log_dir))
else:
telegraf_restart_retries = 0
# 5. Check if ME is running, if not, then restart
if enable_metrics_ext:
if not me_handler.is_running(is_lad=True):
if me_restart_retries < max_restart_retries:
me_restart_retries += 1
hutil.log("MetricsExtension binary process is not running. Restarting MetricsExtension now. Retry count - {0}".format(me_restart_retries))
me_out, me_msg = me_handler.stop_metrics_service(is_lad=True)
if me_out:
hutil.log(me_msg)
else:
hutil.error(me_msg)
start_metrics_out, log_messages = me_handler.start_metrics(is_lad=True)
if start_metrics_out:
hutil.log("Successfully started metrics-extension.")
else:
hutil.error(log_messages)
else:
hutil.error("MetricsExtension binary process is not running. Failed to restart after {0} retries. Please check /var/log/syslog for ME logs".format(max_restart_retries))
else:
me_restart_retries = 0
# 6. Regenerate the MSI auth token required for ME if it is nearing expiration
# Generate/regenerate MSI Token required by ME
global me_msi_token_expiry_epoch
generate_token = False
me_token_path = g_ext_dir + "/config/metrics_configs/AuthToken-MSI.json"
if me_msi_token_expiry_epoch is None or me_msi_token_expiry_epoch == "":
if os.path.isfile(me_token_path):
with open(me_token_path, "r") as f:
authtoken_content = json.loads(f.read())
if authtoken_content and "expires_on" in authtoken_content:
me_msi_token_expiry_epoch = authtoken_content["expires_on"]
else:
generate_token = True
else:
generate_token = True
if me_msi_token_expiry_epoch:
currentTime = datetime.datetime.now()
token_expiry_time = datetime.datetime.fromtimestamp(float(me_msi_token_expiry_epoch))
if token_expiry_time - currentTime < datetime.timedelta(minutes=30):
# The MSI Token will expire within 30 minutes. We need to refresh the token
generate_token = True
if generate_token:
generate_token = False
msi_token_generated, me_msi_token_expiry_epoch, log_messages = me_handler.generate_MSI_token()
if msi_token_generated:
hutil.log("Successfully refreshed metrics-extension MSI Auth token.")
else:
hutil.error(log_messages)
# Out of the inner while loop: mdsd terminated.
if mdsd_stdout_stream:
mdsd_stdout_stream.close()
mdsd_stdout_stream = None
# Check if this is NOT a quick crash -- we consider a crash quick
# if it's within 30 minutes from the start time. If it's not quick,
# we just continue by restarting mdsd.
mdsd_up_time = datetime.datetime.now() - last_mdsd_start_time
if mdsd_up_time > datetime.timedelta(minutes=30):
mdsd_terminated_msg = "MDSD terminated after " + str(mdsd_up_time) + ". "\
+ tail(mdsd_stdout_redirect_path) + tail(err_file_path)
hutil.log(mdsd_terminated_msg)
num_quick_consecutive_crashes = 0
continue
# It's a quick crash. Log error and add an extension event.
num_quick_consecutive_crashes += 1
mdsd_crash_msg = "MDSD crash(uptime=" + str(mdsd_up_time) + "):" + tail(mdsd_stdout_redirect_path) + tail(err_file_path)
hutil.error("MDSD crashed:" + mdsd_crash_msg)
# mdsd all 3 allowed quick/consecutive crashes exhausted
hutil.do_status_report(waagent_ext_event_type, "error", '1', "mdsd stopped: " + mdsd_crash_msg)
# Need to tear down omsagent setup for LAD before returning/exiting if it was set up earlier
oms.tear_down_omsagent_for_lad(RunGetOutput, False)
try:
waagent.AddExtensionEvent(name=hutil.get_name(),
op=waagent_ext_event_type,
isSuccess=False,
version=hutil.get_extension_version(),
message=mdsd_crash_msg)
except Exception:
pass
except Exception as e:
if mdsd_stdout_stream:
hutil.error("Error :" + tail(mdsd_stdout_redirect_path))
errmsg = "Failed to launch mdsd with error: {0}, traceback: {1}".format(e, traceback.format_exc())
hutil.error(errmsg)
hutil.do_status_report(waagent_ext_event_type, 'error', '1', errmsg)
waagent.AddExtensionEvent(name=hutil.get_name(),
op=waagent_ext_event_type,
isSuccess=False,
version=hutil.get_extension_version(),
message=errmsg)
finally:
if mdsd_stdout_stream:
mdsd_stdout_stream.close()