def metrics_watcher()

in AzureMonitorAgent/agent.py [0:0]


def metrics_watcher(hutil_error, hutil_log):
    """
    Watcher thread to monitor metric configuration changes and to take action on them
    """

    # Check every 30 seconds
    sleepTime =  30

    # Retrieve managed identity info that may be needed for token retrieval
    identifier_name, identifier_value, error_msg = get_managed_identity()
    if error_msg:
        hutil_error('Failed to determine managed identity settings; MSI token retreival will rely on default identity, if any. {0}.'.format(error_msg))
    if identifier_name and identifier_value:
        managed_identity_str = "uai#{0}#{1}".format(identifier_name, identifier_value)
    else:
        managed_identity_str = "sai"

    # Sleep before starting the monitoring
    time.sleep(sleepTime)
    last_crc = None
    last_crc_fluent = None
    me_msi_token_expiry_epoch = None

    while True:
        try:
            # update fluent config for fluent port if needed
            fluent_port = ''
            if os.path.isfile(AMAFluentPortFilePath):
                f = open(AMAFluentPortFilePath, "r")
                fluent_port = f.read()
                f.close()
            
            if fluent_port != '' and os.path.isfile(FluentCfgPath):
                portSetting = "    Port                       "  + fluent_port + "\n"
                defaultPortSetting = 'Port'
                portUpdated = True                
                with open(FluentCfgPath, 'r') as f:                    
                    for line in f:                        
                        found = re.search(r'^\s{0,}Port\s{1,}' + fluent_port + '$', line)
                        if found:
                            portUpdated = False

                if portUpdated == True:
                    with contextlib.closing(fileinput.FileInput(FluentCfgPath, inplace=True, backup='.bak')) as file:
                        for line in file:
                            if defaultPortSetting in line:
                                print(portSetting, end='')
                            else:
                                print(line, end='')
                    os.chmod(FluentCfgPath, stat.S_IRGRP | stat.S_IRUSR | stat.S_IWUSR | stat.S_IROTH)

                    # add SELinux rules if needed
                    if os.path.exists('/etc/selinux/config') and fluent_port != '':
                        sedisabled, _ = run_command_and_log('getenforce | grep -i "Disabled"',log_cmd=False, log_output=False)
                        if sedisabled != 0:                        
                            check_semanage, _ = run_command_and_log("which semanage",log_cmd=False, log_output=False)
                            if check_semanage == 0:
                                fluentPortEnabled, _ = run_command_and_log('grep -Rnw /var/lib/selinux -e http_port_t | grep ' + fluent_port,log_cmd=False, log_output=False)
                                if fluentPortEnabled != 0:                    
                                    # also check SELinux config paths for Oracle/RH
                                    fluentPortEnabled, _ = run_command_and_log('grep -Rnw /etc/selinux -e http_port_t | grep ' + fluent_port,log_cmd=False, log_output=False)
                                    if fluentPortEnabled != 0:                    
                                        # allow the fluent port in SELinux
                                        run_command_and_log('semanage port -a -t http_port_t -p tcp ' + fluent_port,log_cmd=False, log_output=False)

            if os.path.isfile(FluentCfgPath):
                f = open(FluentCfgPath, "r")
                data = f.read()

                if (data != ''):
                    crc_fluent = hashlib.sha256(data.encode('utf-8')).hexdigest()

                    if (crc_fluent != last_crc_fluent):                        
                        restart_launcher()
                        last_crc_fluent = crc_fluent
           
            if os.path.isfile(MdsdCounterJsonPath):
                f = open(MdsdCounterJsonPath, "r")
                data = f.read()

                if (data != ''):
                    json_data = json.loads(data)

                    if len(json_data) == 0:
                        last_crc = hashlib.sha256(data.encode('utf-8')).hexdigest()
                        if telhandler.is_running(is_lad=False):
                            # Stop the telegraf and ME services
                            tel_out, tel_msg = telhandler.stop_telegraf_service(is_lad=False)
                            if tel_out:
                                hutil_log(tel_msg)
                            else:
                                hutil_error(tel_msg)

                            # Delete the telegraf and ME services
                            tel_rm_out, tel_rm_msg = telhandler.remove_telegraf_service(is_lad=False)
                            if tel_rm_out:
                                hutil_log(tel_rm_msg)
                            else:
                                hutil_error(tel_rm_msg)

                        if me_handler.is_running(is_lad=False):
                            me_out, me_msg = me_handler.stop_metrics_service(is_lad=False)
                            if me_out:
                                hutil_log(me_msg)
                            else:
                                hutil_error(me_msg)

                            me_rm_out, me_rm_msg = me_handler.remove_metrics_service(is_lad=False)
                            if me_rm_out:
                                hutil_log(me_rm_msg)
                            else:
                                hutil_error(me_rm_msg)
                    else:
                        crc = hashlib.sha256(data.encode('utf-8')).hexdigest()

                        if(crc != last_crc):
                            # Resetting the me_msi_token_expiry_epoch variable if we set up ME again.
                            me_msi_token_expiry_epoch = None
                            hutil_log("Start processing metric configuration")
                            hutil_log(data)

                            telegraf_config, telegraf_namespaces = telhandler.handle_config(
                                json_data,
                                "unix:///run/azuremonitoragent/mdm_influxdb.socket",
                                "unix:///run/azuremonitoragent/default_influx.socket",
                                is_lad=False)
                            me_service_template_path = os.getcwd() + "/services/metrics-extension.service"
                            if os.path.exists(me_service_template_path):
                                os.remove(me_service_template_path)
                            if is_feature_enabled("enableCMV2"):                                
                                copyfile(os.getcwd() + "/services/metrics-extension-otlp.service", me_service_template_path)
                            else:
                                copyfile(os.getcwd() + "/services/metrics-extension-cmv1.service", me_service_template_path)

                            me_handler.setup_me(is_lad=False, managed_identity=managed_identity_str, HUtilObj=HUtilObject)

                            start_telegraf_res, log_messages = telhandler.start_telegraf(is_lad=False)
                            if start_telegraf_res:
                                hutil_log("Successfully started metrics-sourcer.")
                            else:
                                hutil_error(log_messages)


                            start_metrics_out, log_messages = me_handler.start_metrics(is_lad=False, managed_identity=managed_identity_str)
                            if start_metrics_out:
                                hutil_log("Successfully started metrics-extension.")
                            else:
                                hutil_error(log_messages)

                            last_crc = crc

                        generate_token = False
                        me_token_path = os.path.join(os.getcwd(), "/config/metrics_configs/AuthToken-MSI.json")

                        if me_msi_token_expiry_epoch is None or me_msi_token_expiry_epoch == "":
                            if os.path.isfile(me_token_path):
                                with open(me_token_path, "r") as f:
                                    authtoken_content = f.read()
                                    if authtoken_content and "expires_on" in authtoken_content:
                                        me_msi_token_expiry_epoch = authtoken_content["expires_on"]
                                    else:
                                        generate_token = True
                            else:
                                generate_token = True

                        if me_msi_token_expiry_epoch:
                            currentTime = datetime.datetime.now()
                            token_expiry_time = datetime.datetime.fromtimestamp(int(me_msi_token_expiry_epoch))
                            if token_expiry_time - currentTime < datetime.timedelta(minutes=30):
                                # The MSI Token will expire within 30 minutes. We need to refresh the token
                                generate_token = True

                        if generate_token:
                            generate_token = False
                            msi_token_generated, me_msi_token_expiry_epoch, log_messages = me_handler.generate_MSI_token(identifier_name, identifier_value, is_lad=False)
                            if msi_token_generated:
                                hutil_log("Successfully refreshed metrics-extension MSI Auth token.")
                            else:
                                hutil_error(log_messages)

                        telegraf_restart_retries = 0
                        me_restart_retries = 0
                        max_restart_retries = 10

                        # Check if telegraf is running, if not, then restart
                        if not telhandler.is_running(is_lad=False):
                            if telegraf_restart_retries < max_restart_retries:
                                telegraf_restart_retries += 1
                                hutil_log("Telegraf binary process is not running. Restarting telegraf now. Retry count - {0}".format(telegraf_restart_retries))
                                tel_out, tel_msg = telhandler.stop_telegraf_service(is_lad=False)
                                if tel_out:
                                    hutil_log(tel_msg)
                                else:
                                    hutil_error(tel_msg)
                                start_telegraf_res, log_messages = telhandler.start_telegraf(is_lad=False)
                                if start_telegraf_res:
                                    hutil_log("Successfully started metrics-sourcer.")
                                else:
                                    hutil_error(log_messages)
                            else:
                                hutil_error("Telegraf binary process is not running. Failed to restart after {0} retries. Please check telegraf.log".format(max_restart_retries))
                        else:
                            telegraf_restart_retries = 0

                        # Check if ME is running, if not, then restart
                        if not me_handler.is_running(is_lad=False):
                            if me_restart_retries < max_restart_retries:
                                me_restart_retries += 1
                                hutil_log("MetricsExtension binary process is not running. Restarting MetricsExtension now. Retry count - {0}".format(me_restart_retries))
                                me_out, me_msg = me_handler.stop_metrics_service(is_lad=False)
                                if me_out:
                                    hutil_log(me_msg)
                                else:
                                    hutil_error(me_msg)
                                start_metrics_out, log_messages = me_handler.start_metrics(is_lad=False, managed_identity=managed_identity_str)

                                if start_metrics_out:
                                    hutil_log("Successfully started metrics-extension.")
                                else:
                                    hutil_error(log_messages)
                            else:
                                hutil_error("MetricsExtension binary process is not running. Failed to restart after {0} retries. Please check /var/log/syslog for ME logs".format(max_restart_retries))
                        else:
                            me_restart_retries = 0

        except IOError as e:
            hutil_error('I/O error in setting up or monitoring metrics. Exception={0}'.format(e))

        except Exception as e:
            hutil_error('Error in setting up or monitoring metrics. Exception={0}'.format(e))

        finally:
            time.sleep(sleepTime)