azurelinuxagent/ga/collect_logs.py (273 lines of code) (raw):

# Microsoft Azure Linux Agent # # Copyright 2020 Microsoft Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Requires Python 2.6+ and Openssl 1.0+ # import datetime import os import sys import threading import time from azurelinuxagent.ga import logcollector, cgroupconfigurator import azurelinuxagent.common.conf as conf from azurelinuxagent.common import logger from azurelinuxagent.ga.cgroupcontroller import MetricsCounter from azurelinuxagent.common.event import elapsed_milliseconds, add_event, WALAEventOperation from azurelinuxagent.common.future import ustr from azurelinuxagent.ga.interfaces import ThreadHandlerInterface from azurelinuxagent.ga.logcollector import COMPRESSED_ARCHIVE_PATH, GRACEFUL_KILL_ERRCODE from azurelinuxagent.ga.cgroupconfigurator import CGroupConfigurator, LOGCOLLECTOR_ANON_MEMORY_LIMIT_FOR_V1_AND_V2, LOGCOLLECTOR_CACHE_MEMORY_LIMIT_FOR_V1_AND_V2, LOGCOLLECTOR_MAX_THROTTLED_EVENTS_FOR_V2 from azurelinuxagent.common.protocol.util import get_protocol_util from azurelinuxagent.common.utils import shellutil from azurelinuxagent.common.utils.shellutil import CommandError from azurelinuxagent.common.version import PY_VERSION_MAJOR, PY_VERSION_MINOR, AGENT_NAME, CURRENT_VERSION def get_collect_logs_handler(): return CollectLogsHandler() def is_log_collection_allowed(): # There are three conditions that need to be met in order to allow periodic log collection: # 1) It should be enabled in the configuration. # 2) The system must be using cgroups to manage services - needed for resource limiting of the log collection. The # agent currently fully supports resource limiting for v1, but only supports log collector resource limiting for v2 # if enabled via configuration. # This condition is True if either: # a. cgroup usage in the agent is enabled; OR # b. the machine is using cgroup v2 and v2 resource limiting is enabled in the configuration. # 3) The python version must be greater than 2.6 in order to support the ZipFile library used when collecting. conf_enabled = conf.get_collect_logs() cgroups_enabled = CGroupConfigurator.get_instance().enabled() cgroup_v2_resource_limiting_enabled = CGroupConfigurator.get_instance().using_cgroup_v2() and conf.get_enable_cgroup_v2_resource_limiting() supported_python = PY_VERSION_MINOR >= 6 if PY_VERSION_MAJOR == 2 else PY_VERSION_MAJOR == 3 is_allowed = conf_enabled and (cgroups_enabled or cgroup_v2_resource_limiting_enabled) and supported_python msg = "Checking if log collection is allowed at this time [{0}]. All three conditions must be met: " \ "1. configuration enabled [{1}], " \ "2. cgroups v1 enabled [{2}] OR cgroups v2 is in use and v2 resource limiting configuration enabled [{3}], " \ "3. python supported: [{4}]".format(is_allowed, conf_enabled, cgroups_enabled, cgroup_v2_resource_limiting_enabled, supported_python) logger.info(msg) add_event( name=AGENT_NAME, version=CURRENT_VERSION, op=WALAEventOperation.LogCollection, is_success=is_allowed, message=msg, log_event=False) return is_allowed class CollectLogsHandler(ThreadHandlerInterface): """ Periodically collects and uploads logs from the VM to the host. """ _THREAD_NAME = "CollectLogsHandler" __CGROUPS_FLAG_ENV_VARIABLE = "_AZURE_GUEST_AGENT_LOG_COLLECTOR_MONITOR_CGROUPS_" @staticmethod def get_thread_name(): return CollectLogsHandler._THREAD_NAME @staticmethod def enable_monitor_cgroups_check(): os.environ[CollectLogsHandler.__CGROUPS_FLAG_ENV_VARIABLE] = "1" @staticmethod def disable_monitor_cgroups_check(): if CollectLogsHandler.__CGROUPS_FLAG_ENV_VARIABLE in os.environ: del os.environ[CollectLogsHandler.__CGROUPS_FLAG_ENV_VARIABLE] @staticmethod def is_enabled_monitor_cgroups_check(): if CollectLogsHandler.__CGROUPS_FLAG_ENV_VARIABLE in os.environ: return os.environ[CollectLogsHandler.__CGROUPS_FLAG_ENV_VARIABLE] == "1" return False def __init__(self): self.protocol = None self.protocol_util = None self.event_thread = None self.should_run = True self.last_state = None self.period = conf.get_collect_logs_period() def run(self): self.start() def keep_alive(self): return self.should_run def is_alive(self): return self.event_thread.is_alive() def start(self): self.event_thread = threading.Thread(target=self.daemon) self.event_thread.daemon = True self.event_thread.name = self.get_thread_name() self.event_thread.start() def join(self): self.event_thread.join() def stopped(self): return not self.should_run def stop(self): self.should_run = False if self.is_alive(): try: self.join() except RuntimeError: pass def init_protocols(self): # The initialization of ProtocolUtil for the log collection thread should be done within the thread itself # rather than initializing it in the ExtHandler thread. This is done to avoid any concurrency issues as each # thread would now have its own ProtocolUtil object as per the SingletonPerThread model. self.protocol_util = get_protocol_util() self.protocol = self.protocol_util.get_protocol() def daemon(self): # Delay the first collector on start up to give short lived VMs (that might be dead before the second # collection has a chance to run) an opportunity to do produce meaningful logs to collect. time.sleep(conf.get_log_collector_initial_delay()) try: CollectLogsHandler.enable_monitor_cgroups_check() if self.protocol_util is None or self.protocol is None: self.init_protocols() while not self.stopped(): try: self.collect_and_send_logs() except Exception as e: logger.error("An error occurred in the log collection thread main loop; " "will skip the current iteration.\n{0}", ustr(e)) finally: time.sleep(self.period) except Exception as e: logger.error("An error occurred in the log collection thread; will exit the thread.\n{0}", ustr(e)) finally: CollectLogsHandler.disable_monitor_cgroups_check() def collect_and_send_logs(self): if self._collect_logs(): self._send_logs() def _collect_logs(self): logger.info("Starting log collection...") # Invoke the command line tool in the agent to collect logs. The --scope option starts the process as a systemd # transient scope unit. The --property option is used to set systemd memory and cpu properties on the scope. systemd_cmd = [ "systemd-run", "--unit={0}".format(logcollector.CGROUPS_UNIT), "--slice={0}".format(cgroupconfigurator.LOGCOLLECTOR_SLICE), "--scope" ] + CGroupConfigurator.get_instance().get_logcollector_unit_properties() # The log tool is invoked from the current agent's egg with the command line option collect_logs_cmd = [sys.executable, "-u", sys.argv[0], "-collect-logs"] final_command = systemd_cmd + collect_logs_cmd def exec_command(): start_time = datetime.datetime.utcnow() success = False msg = None try: shellutil.run_command(final_command, log_error=False) duration = elapsed_milliseconds(start_time) archive_size = os.path.getsize(COMPRESSED_ARCHIVE_PATH) msg = "Successfully collected logs. Archive size: {0} b, elapsed time: {1} ms.".format(archive_size, duration) logger.info(msg) success = True return True except Exception as e: duration = elapsed_milliseconds(start_time) err_msg = ustr(e) if isinstance(e, CommandError): # pylint has limited (i.e. no) awareness of control flow w.r.t. typing. we disable=no-member # here because we know e must be a CommandError but pylint still considers the case where # e is a different type of exception. err_msg = ustr("Log Collector exited with code {0}").format(e.returncode) # pylint: disable=no-member if e.returncode == logcollector.INVALID_CGROUPS_ERRCODE: # pylint: disable=no-member logger.info("Disabling periodic log collection until service restart due to process error.") self.stop() # When the log collector memory limit is exceeded, Agent gracefully exit the process with this error code. # Stop the periodic operation because it seems to be persistent. elif e.returncode == logcollector.GRACEFUL_KILL_ERRCODE: # pylint: disable=no-member logger.info("Disabling periodic log collection until service restart due to exceeded process memory limit.") self.stop() else: logger.info(err_msg) msg = "Failed to collect logs. Elapsed time: {0} ms. Error: {1}".format(duration, err_msg) # No need to log to the local log since we logged stdout, stderr from the process. return False finally: add_event( name=AGENT_NAME, version=CURRENT_VERSION, op=WALAEventOperation.LogCollection, is_success=success, message=msg, log_event=False) return exec_command() def _send_logs(self): msg = None success = False try: with open(COMPRESSED_ARCHIVE_PATH, "rb") as fh: archive_content = fh.read() self.protocol.upload_logs(archive_content) msg = "Successfully uploaded logs." logger.info(msg) success = True except Exception as e: msg = "Failed to upload logs. Error: {0}".format(ustr(e)) logger.warn(msg) finally: add_event( name=AGENT_NAME, version=CURRENT_VERSION, op=WALAEventOperation.LogCollection, is_success=success, message=msg, log_event=False) def get_log_collector_monitor_handler(controllers): return LogCollectorMonitorHandler(controllers) class LogCollectorMonitorHandler(ThreadHandlerInterface): """ Periodically monitor and checks the Log collector Cgroups and sends telemetry to Kusto. """ _THREAD_NAME = "LogCollectorMonitorHandler" @staticmethod def get_thread_name(): return LogCollectorMonitorHandler._THREAD_NAME def __init__(self, controllers): self.event_thread = None self.should_run = True self.period = 2 # Log collector monitor runs every 2 secs. self.controllers = controllers self.max_recorded_metrics = {} self.__should_log_metrics = conf.get_cgroup_log_metrics() def run(self): self.start() def stop(self): self.should_run = False if self.is_alive(): self.join() def join(self): self.event_thread.join() def stopped(self): return not self.should_run def is_alive(self): return self.event_thread is not None and self.event_thread.is_alive() def start(self): self.event_thread = threading.Thread(target=self.daemon) self.event_thread.daemon = True self.event_thread.name = self.get_thread_name() self.event_thread.start() def daemon(self): try: while not self.stopped(): try: metrics = self._poll_resource_usage() if self.__should_log_metrics: self._log_metrics(metrics) self._verify_memory_limit(metrics) except Exception as e: logger.error("An error occurred in the log collection monitor thread loop; " "will skip the current iteration.\n{0}", ustr(e)) finally: time.sleep(self.period) except Exception as e: logger.error( "An error occurred in the MonitorLogCollectorCgroupsHandler thread; will exit the thread.\n{0}", ustr(e)) def get_max_recorded_metrics(self): return self.max_recorded_metrics def _poll_resource_usage(self): metrics = [] for controller in self.controllers: metrics.extend(controller.get_tracked_metrics()) for metric in metrics: current_max = self.max_recorded_metrics.get(metric.counter) self.max_recorded_metrics[metric.counter] = metric.value if current_max is None else max(current_max, metric.value) return metrics def _log_metrics(self, metrics): for metric in metrics: logger.info("Metric {0}/{1} [{2}] = {3}".format(metric.category, metric.counter, metric.instance, metric.value)) def _verify_memory_limit(self, metrics): current_anon_and_swap_usage = 0 current_cache_usage = 0 memory_throttled_events = 0 for metric in metrics: if metric.counter == MetricsCounter.ANON_MEM_USAGE: current_anon_and_swap_usage += metric.value elif metric.counter == MetricsCounter.SWAP_MEM_USAGE: current_anon_and_swap_usage += metric.value elif metric.counter == MetricsCounter.CACHE_MEM_USAGE: current_cache_usage = metric.value elif metric.counter == MetricsCounter.MEM_THROTTLED: memory_throttled_events = metric.value mem_limit_exceeded = False if current_anon_and_swap_usage > LOGCOLLECTOR_ANON_MEMORY_LIMIT_FOR_V1_AND_V2: mem_limit_exceeded = True msg = "Log collector anon + swap memory limit {0} bytes exceeded. The reported usage is {1} bytes.".format(LOGCOLLECTOR_ANON_MEMORY_LIMIT_FOR_V1_AND_V2, current_anon_and_swap_usage) logger.info(msg) add_event(name=AGENT_NAME, version=CURRENT_VERSION, op=WALAEventOperation.LogCollection, message=msg) if current_cache_usage > LOGCOLLECTOR_CACHE_MEMORY_LIMIT_FOR_V1_AND_V2: mem_limit_exceeded = True msg = "Log collector cache memory limit {0} bytes exceeded. The reported usage is {1} bytes.".format(LOGCOLLECTOR_CACHE_MEMORY_LIMIT_FOR_V1_AND_V2, current_cache_usage) logger.info(msg) add_event(name=AGENT_NAME, version=CURRENT_VERSION, op=WALAEventOperation.LogCollection, message=msg) if memory_throttled_events > LOGCOLLECTOR_MAX_THROTTLED_EVENTS_FOR_V2: mem_limit_exceeded = True msg = "Log collector memory throttled events limit {0} exceeded. The reported number of throttled events is {1}.".format(LOGCOLLECTOR_MAX_THROTTLED_EVENTS_FOR_V2, memory_throttled_events) logger.info(msg) add_event(name=AGENT_NAME, version=CURRENT_VERSION, op=WALAEventOperation.LogCollection, message=msg) if mem_limit_exceeded: os._exit(GRACEFUL_KILL_ERRCODE)