def start_mdsd()

in Diagnostic/diagnostic.py [0:0]


def start_mdsd(configurator):
    """
    Start mdsd and monitor its activities. Report if it crashes or emits error logs.
    :param configurator: A valid LadConfigAll object that was obtained by create_core_components_config().
                         This will be used for configuring rsyslog/syslog-ng/fluentd/in_syslog/out_mdsd components
    :return: None
    """
    # This must be done first, so that extension enable completion doesn't get delayed.
    write_lad_pids_to_file(g_lad_pids_filepath, os.getpid())

    # Need 'HeartBeat' instead of 'Daemon'
    waagent_ext_event_type = wala_event_type_for_telemetry(g_ext_op_type)

    # mdsd http proxy setting
    proxy_config = get_mdsd_proxy_config(waagent.HttpProxyConfigString, g_ext_settings, hutil.log)
    if proxy_config:
        # Add MDSD_http_proxy to current environment. Child processes will inherit its value.
        os.environ['MDSD_http_proxy'] = proxy_config

    copy_env = os.environ.copy()
    # Add MDSD_CONFIG_DIR as an env variable since new mdsd master branch LAD doesnt create this dir
    mdsd_config_cache_dir = os.path.join(g_ext_dir, "config")
    copy_env["MDSD_CONFIG_DIR"] = mdsd_config_cache_dir


    # We then validate the mdsd config and proceed only when it succeeds.
    xml_file = os.path.join(g_ext_dir, 'xmlCfg.xml')
    tmp_env_dict = {}  # Need to get the additionally needed env vars (SSL_CERT_*) for this mdsd run as well...
    g_dist_config.extend_environment(tmp_env_dict)
    added_env_str = ' '.join('{0}={1}'.format(k, tmp_env_dict[k]) for k in tmp_env_dict)
    config_validate_cmd = '{0}{1}{2} -v -c {3} -r {4}'.format(added_env_str, ' ' if added_env_str else '',
                                                       g_mdsd_bin_path, xml_file, g_ext_dir)
    config_validate_cmd_status, config_validate_cmd_msg = RunGetOutput(config_validate_cmd)
    if config_validate_cmd_status is not 0:
        # Invalid config. Log error and report success.
        g_lad_log_helper.log_and_report_invalid_mdsd_cfg(g_ext_op_type,
                                                         config_validate_cmd_msg, read_file_to_string(xml_file))
        return

    # Start OMI if it's not running.
    # This shouldn't happen, but this measure is put in place just in case (e.g., Ubuntu 16.04 systemd).
    # Don't check if starting succeeded, as it'll be done in the loop below anyway.
    omi_running = RunGetOutput("/opt/omi/bin/service_control is-running", should_log=False)[0] is 1
    if not omi_running:
        hutil.log("OMI is not running. Restarting it.")
        RunGetOutput("/opt/omi/bin/service_control restart")

    log_dir = hutil.get_log_dir()
    err_file_path = os.path.join(log_dir, 'mdsd.err')
    info_file_path = os.path.join(log_dir, 'mdsd.info')
    warn_file_path = os.path.join(log_dir, 'mdsd.warn')
    qos_file_path = os.path.join(log_dir, 'mdsd.qos')
    # Need to provide EH events and Rsyslog spool path since the new mdsd master branch LAD doesnt create the directory needed
    eh_spool_path = os.path.join(log_dir, 'eh')

    update_selinux_settings_for_rsyslogomazuremds(RunGetOutput, g_ext_dir)

    mdsd_stdout_redirect_path = os.path.join(g_ext_dir, "mdsd.log")
    mdsd_stdout_stream = None

    g_dist_config.extend_environment(copy_env)

    # Now prepare actual mdsd cmdline.
    command = '{0} -A -C -c {1} -R -r {2} -e {3} -w {4} -q {8} -S {7} -o {5}{6}'.format(
        g_mdsd_bin_path,
        xml_file,
        g_mdsd_role_name,
        err_file_path,
        warn_file_path,
        info_file_path,
        g_ext_settings.get_mdsd_trace_option(),
        eh_spool_path,
        qos_file_path).split(" ")

    try:
        start_watcher_thread()

        num_quick_consecutive_crashes = 0
        mdsd_crash_msg = ''

        while num_quick_consecutive_crashes < 3:  # We consider only quick & consecutive crashes for retries

            RunGetOutput('rm -f ' + g_mdsd_file_resources_prefix + '.pidport')  # Must delete any existing port num file
            mdsd_stdout_stream = open(mdsd_stdout_redirect_path, "w")
            hutil.log("Start mdsd " + str(command))
            mdsd = subprocess.Popen(command,
                                    cwd=g_ext_dir,
                                    stdout=mdsd_stdout_stream,
                                    stderr=mdsd_stdout_stream,
                                    env=copy_env)

            write_lad_pids_to_file(g_lad_pids_filepath, os.getpid(), mdsd.pid)

            last_mdsd_start_time = datetime.datetime.now()
            last_error_time = last_mdsd_start_time
            omi_installed = True  # Remembers if OMI is installed at each iteration
            telegraf_restart_retries = 0
            me_restart_retries = 0
            max_restart_retries = 10
            # Continuously monitors mdsd process
            while True:
                time.sleep(30)
                if " ".join(get_lad_pids()).find(str(mdsd.pid)) < 0 and len(get_lad_pids()) >= 2:
                    mdsd.kill()
                    hutil.log("Another process is started, now exit")
                    return
                if mdsd.poll() is not None:  # if mdsd has terminated
                    time.sleep(60)
                    mdsd_stdout_stream.flush()
                    break

                # mdsd is now up for at least 30 seconds. Do some monitoring activities.
                # 1. Mitigate if memory leak is suspected.
                mdsd_memory_leak_suspected, mdsd_memory_usage_in_KB = check_suspected_memory_leak(mdsd.pid, hutil.error)
                if mdsd_memory_leak_suspected:
                    g_lad_log_helper.log_suspected_memory_leak_and_kill_mdsd(mdsd_memory_usage_in_KB, mdsd,
                                                                             waagent_ext_event_type)
                    break
                # 2. Restart OMI if it crashed (Issue #128)
                omi_installed = restart_omi_if_crashed(omi_installed, mdsd)
                # 3. Check if there's any new logs in mdsd.err and report
                last_error_time = report_new_mdsd_errors(err_file_path, last_error_time)
                # 4. Check if telegraf is running, if not, then restart
                if enable_telegraf and not telhandler.is_running(is_lad=True):
                    if telegraf_restart_retries < max_restart_retries:
                        telegraf_restart_retries += 1
                        hutil.log("Telegraf binary process is not running. Restarting telegraf now. Retry count - {0}".format(telegraf_restart_retries))
                        tel_out, tel_msg = telhandler.stop_telegraf_service(is_lad=True)
                        if tel_out:
                            hutil.log(tel_msg)
                        else:
                            hutil.error(tel_msg)
                        start_telegraf_res, log_messages = telhandler.start_telegraf(is_lad=True)
                        if start_telegraf_res:
                            hutil.log("Successfully started metrics-sourcer.")
                        else:
                            hutil.error(log_messages)
                    else:
                        hutil.error("Telegraf binary process is not running. Failed to restart after {0} retries. Please check telegraf.log at {1}".format(max_restart_retries, log_dir))
                else:
                    telegraf_restart_retries = 0
                # 5. Check if ME is running, if not, then restart
                if enable_metrics_ext:
                    if not me_handler.is_running(is_lad=True):
                        if me_restart_retries < max_restart_retries:
                            me_restart_retries += 1
                            hutil.log("MetricsExtension binary process is not running. Restarting MetricsExtension now. Retry count - {0}".format(me_restart_retries))
                            me_out, me_msg = me_handler.stop_metrics_service(is_lad=True)
                            if me_out:
                                hutil.log(me_msg)
                            else:
                                hutil.error(me_msg)
                            start_metrics_out, log_messages = me_handler.start_metrics(is_lad=True)
                            if start_metrics_out:
                                hutil.log("Successfully started metrics-extension.")
                            else:
                                hutil.error(log_messages)
                        else:
                            hutil.error("MetricsExtension binary process is not running. Failed to restart after {0} retries. Please check /var/log/syslog for ME logs".format(max_restart_retries))
                    else:
                        me_restart_retries = 0
                    # 6. Regenerate the MSI auth token required for ME if it is nearing expiration
                    # Generate/regenerate MSI Token required by ME
                    global me_msi_token_expiry_epoch
                    generate_token = False
                    me_token_path = g_ext_dir + "/config/metrics_configs/AuthToken-MSI.json"

                    if me_msi_token_expiry_epoch is None  or me_msi_token_expiry_epoch == "":
                        if os.path.isfile(me_token_path):
                            with open(me_token_path, "r") as f:
                                authtoken_content = json.loads(f.read())
                                if authtoken_content and "expires_on" in authtoken_content:
                                    me_msi_token_expiry_epoch = authtoken_content["expires_on"]
                                else:
                                    generate_token = True
                        else:
                            generate_token = True

                    if me_msi_token_expiry_epoch:
                        currentTime = datetime.datetime.now()
                        token_expiry_time = datetime.datetime.fromtimestamp(float(me_msi_token_expiry_epoch))
                        if token_expiry_time - currentTime < datetime.timedelta(minutes=30):
                            # The MSI Token will expire within 30 minutes. We need to refresh the token
                            generate_token = True

                    if generate_token:
                        generate_token = False
                        msi_token_generated, me_msi_token_expiry_epoch, log_messages = me_handler.generate_MSI_token()
                        if msi_token_generated:
                            hutil.log("Successfully refreshed metrics-extension MSI Auth token.")
                        else:
                            hutil.error(log_messages)

            # Out of the inner while loop: mdsd terminated.
            if mdsd_stdout_stream:
                mdsd_stdout_stream.close()
                mdsd_stdout_stream = None

            # Check if this is NOT a quick crash -- we consider a crash quick
            # if it's within 30 minutes from the start time. If it's not quick,
            # we just continue by restarting mdsd.
            mdsd_up_time = datetime.datetime.now() - last_mdsd_start_time
            if mdsd_up_time > datetime.timedelta(minutes=30):
                mdsd_terminated_msg = "MDSD terminated after " + str(mdsd_up_time) + ". "\
                                      + tail(mdsd_stdout_redirect_path) + tail(err_file_path)
                hutil.log(mdsd_terminated_msg)
                num_quick_consecutive_crashes = 0
                continue

            # It's a quick crash. Log error and add an extension event.
            num_quick_consecutive_crashes += 1

            mdsd_crash_msg = "MDSD crash(uptime=" + str(mdsd_up_time) + "):" + tail(mdsd_stdout_redirect_path) + tail(err_file_path)
            hutil.error("MDSD crashed:" + mdsd_crash_msg)

        # mdsd all 3 allowed quick/consecutive crashes exhausted
        hutil.do_status_report(waagent_ext_event_type, "error", '1', "mdsd stopped: " + mdsd_crash_msg)
        # Need to tear down omsagent setup for LAD before returning/exiting if it was set up earlier
        oms.tear_down_omsagent_for_lad(RunGetOutput, False)
        try:
            waagent.AddExtensionEvent(name=hutil.get_name(),
                                      op=waagent_ext_event_type,
                                      isSuccess=False,
                                      version=hutil.get_extension_version(),
                                      message=mdsd_crash_msg)
        except Exception:
            pass

    except Exception as e:
        if mdsd_stdout_stream:
            hutil.error("Error :" + tail(mdsd_stdout_redirect_path))
        errmsg = "Failed to launch mdsd with error: {0}, traceback: {1}".format(e, traceback.format_exc())
        hutil.error(errmsg)
        hutil.do_status_report(waagent_ext_event_type, 'error', '1', errmsg)
        waagent.AddExtensionEvent(name=hutil.get_name(),
                                  op=waagent_ext_event_type,
                                  isSuccess=False,
                                  version=hutil.get_extension_version(),
                                  message=errmsg)
    finally:
        if mdsd_stdout_stream:
            mdsd_stdout_stream.close()