azurelinuxagent/ga/logcollector.py (277 lines of code) (raw):

# Microsoft Azure Linux Agent # # Copyright 2020 Microsoft Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Requires Python 2.6+ and Openssl 1.0+ # import glob import logging import os import subprocess import time import zipfile from datetime import datetime from heapq import heappush, heappop from azurelinuxagent.common.conf import get_lib_dir, get_ext_log_dir, get_agent_log_file from azurelinuxagent.common.event import initialize_event_logger_vminfo_common_parameters_and_protocol, add_event, WALAEventOperation from azurelinuxagent.common.future import ustr from azurelinuxagent.ga.logcollector_manifests import MANIFEST_NORMAL, MANIFEST_FULL # Please note: be careful when adding agent dependencies in this module. # This module uses its own logger and logs to its own file, not to the agent log. from azurelinuxagent.common.protocol.goal_state import GoalStateProperties from azurelinuxagent.common.protocol.util import get_protocol_util _EXTENSION_LOG_DIR = get_ext_log_dir() _AGENT_LIB_DIR = get_lib_dir() _AGENT_LOG = get_agent_log_file() _LOG_COLLECTOR_DIR = os.path.join(_AGENT_LIB_DIR, "logcollector") _TRUNCATED_FILES_DIR = os.path.join(_LOG_COLLECTOR_DIR, "truncated") OUTPUT_RESULTS_FILE_PATH = os.path.join(_LOG_COLLECTOR_DIR, "results.txt") COMPRESSED_ARCHIVE_PATH = os.path.join(_LOG_COLLECTOR_DIR, "logs.zip") CGROUPS_UNIT = "collect-logs.scope" GRACEFUL_KILL_ERRCODE = 3 INVALID_CGROUPS_ERRCODE = 2 _MUST_COLLECT_FILES = [ _AGENT_LOG, os.path.join(_AGENT_LIB_DIR, "waagent_status.json"), os.path.join(_AGENT_LIB_DIR, "history", "*.zip"), os.path.join(_EXTENSION_LOG_DIR, "*", "*"), os.path.join(_EXTENSION_LOG_DIR, "*", "*", "*"), "{0}.*".format(_AGENT_LOG) # any additional waagent.log files (e.g., waagent.log.1.gz) ] _FILE_SIZE_LIMIT = 30 * 1024 * 1024 # 30 MB _UNCOMPRESSED_ARCHIVE_SIZE_LIMIT = 150 * 1024 * 1024 # 150 MB _LOGGER = logging.getLogger(__name__) class LogCollector(object): _TRUNCATED_FILE_PREFIX = "truncated_" def __init__(self, is_full_mode=False): self._is_full_mode = is_full_mode self._manifest = MANIFEST_FULL if is_full_mode else MANIFEST_NORMAL self._must_collect_files = self._expand_must_collect_files() self._create_base_dirs() self._set_logger() @staticmethod def _mkdir(dirname): if not os.path.isdir(dirname): os.makedirs(dirname) @staticmethod def _reset_file(filepath): with open(filepath, "wb") as out_file: out_file.write("".encode("utf-8")) @staticmethod def _create_base_dirs(): LogCollector._mkdir(_LOG_COLLECTOR_DIR) LogCollector._mkdir(_TRUNCATED_FILES_DIR) @staticmethod def _set_logger(): _f_handler = logging.FileHandler(OUTPUT_RESULTS_FILE_PATH, encoding="utf-8") _f_format = logging.Formatter(fmt='%(asctime)s %(levelname)s %(message)s', datefmt=u'%Y-%m-%dT%H:%M:%SZ') _f_format.converter = time.gmtime _f_handler.setFormatter(_f_format) _LOGGER.addHandler(_f_handler) _LOGGER.setLevel(logging.INFO) @staticmethod def initialize_telemetry(): protocol = get_protocol_util().get_protocol(init_goal_state=False) protocol.client.reset_goal_state(goal_state_properties=GoalStateProperties.RoleConfig | GoalStateProperties.HostingEnv) # Initialize the common parameters for telemetry events initialize_event_logger_vminfo_common_parameters_and_protocol(protocol) @staticmethod def _run_shell_command(command, stdout=subprocess.PIPE, log_output=False): """ Runs a shell command in a subprocess, logs any errors to the log file, enables changing the stdout stream, and logs the output of the command to the log file if indicated by the `log_output` parameter. :param command: Shell command to run :param stdout: Where to write the output of the command :param log_output: If true, log the command output to the log file """ def format_command(cmd): return " ".join(cmd) if isinstance(cmd, list) else command def _encode_command_output(output): return ustr(output, encoding="utf-8", errors="backslashreplace") try: process = subprocess.Popen(command, stdout=stdout, stderr=subprocess.PIPE, shell=False) stdout, stderr = process.communicate() return_code = process.returncode except Exception as e: error_msg = u"Command [{0}] raised unexpected exception: [{1}]".format(format_command(command), ustr(e)) _LOGGER.error(error_msg) return if return_code != 0: encoded_stdout = _encode_command_output(stdout) encoded_stderr = _encode_command_output(stderr) error_msg = "Command: [{0}], return code: [{1}], stdout: [{2}] stderr: [{3}]".format(format_command(command), return_code, encoded_stdout, encoded_stderr) _LOGGER.error(error_msg) return if log_output: msg = "Output of command [{0}]:\n{1}".format(format_command(command), _encode_command_output(stdout)) _LOGGER.info(msg) @staticmethod def _expand_must_collect_files(): # Match the regexes from the MUST_COLLECT_FILES list to existing file paths on disk. manifest = [] for path in _MUST_COLLECT_FILES: manifest.extend(sorted(glob.glob(path))) return manifest def _read_manifest(self): return self._manifest.splitlines() @staticmethod def _process_ll_command(folder): LogCollector._run_shell_command(["ls", "-alF", folder], log_output=True) @staticmethod def _process_echo_command(message): _LOGGER.info(message) @staticmethod def _process_copy_command(path): file_paths = glob.glob(path) for file_path in file_paths: _LOGGER.info(file_path) return file_paths @staticmethod def _convert_file_name_to_archive_name(file_name): # File name is the name of the file on disk, whereas archive name is the name of that same file in the archive. # For non-truncated files: /var/log/waagent.log on disk becomes var/log/waagent.log in archive # (leading separator is removed by the archive). # For truncated files: /var/lib/waagent/logcollector/truncated/var/log/syslog.1 on disk becomes # truncated_var_log_syslog.1 in the archive. if file_name.startswith(_TRUNCATED_FILES_DIR): original_file_path = file_name[len(_TRUNCATED_FILES_DIR):].lstrip(os.path.sep) archive_file_name = LogCollector._TRUNCATED_FILE_PREFIX + original_file_path.replace(os.path.sep, "_") return archive_file_name else: return file_name.lstrip(os.path.sep) @staticmethod def _remove_uncollected_truncated_files(files_to_collect): # After log collection is completed, see if there are any old truncated files which were not collected # and remove them since they probably won't be collected in the future. This is possible when the # original file got deleted, so there is no need to keep its truncated version anymore. truncated_files = os.listdir(_TRUNCATED_FILES_DIR) for file_path in truncated_files: full_path = os.path.join(_TRUNCATED_FILES_DIR, file_path) if full_path not in files_to_collect: if os.path.isfile(full_path): os.remove(full_path) @staticmethod def _expand_parameters(manifest_data): _LOGGER.info("Using %s as $LIB_DIR", _AGENT_LIB_DIR) _LOGGER.info("Using %s as $LOG_DIR", _EXTENSION_LOG_DIR) _LOGGER.info("Using %s as $AGENT_LOG", _AGENT_LOG) new_manifest = [] for line in manifest_data: new_line = line.replace("$LIB_DIR", _AGENT_LIB_DIR) new_line = new_line.replace("$LOG_DIR", _EXTENSION_LOG_DIR) new_line = new_line.replace("$AGENT_LOG", _AGENT_LOG) new_manifest.append(new_line) return new_manifest def _process_manifest_file(self): files_to_collect = set() data = self._read_manifest() manifest_entries = LogCollector._expand_parameters(data) for entry in manifest_entries: # The entry can be one of the four flavours: # 1) ll,/etc/udev/rules.d -- list out contents of the folder and store to results file # 2) echo,### Gathering Configuration Files ### -- print message to results file # 3) copy,/var/lib/waagent/provisioned -- add file to list of files to be collected # 4) diskinfo, -- ignore commands from manifest other than ll, echo, and copy for now contents = entry.split(",") if len(contents) != 2: # If it's not a comment or an empty line, it's a malformed entry if not entry.startswith("#") and len(entry.strip()) > 0: _LOGGER.error("Couldn't parse \"%s\"", entry) continue command, value = contents if command == "ll": self._process_ll_command(value) elif command == "echo": self._process_echo_command(value) elif command == "copy": files_to_collect.update(self._process_copy_command(value)) return files_to_collect @staticmethod def _truncate_large_file(file_path): # Truncate large file to size limit (keep freshest entries of the file), copy file to a temporary location # and update file path in list of files to collect try: # Binary files cannot be truncated, don't include large binary files ext = os.path.splitext(file_path)[1] if ext in [".gz", ".zip", ".xz"]: _LOGGER.warning("Discarding large binary file %s", file_path) return None truncated_file_path = os.path.join(_TRUNCATED_FILES_DIR, file_path.replace(os.path.sep, "_")) if os.path.exists(truncated_file_path): original_file_mtime = os.path.getmtime(file_path) truncated_file_mtime = os.path.getmtime(truncated_file_path) # If the original file hasn't been updated since the truncated file, it means there were no changes # and we don't need to truncate it again. if original_file_mtime < truncated_file_mtime: return truncated_file_path # Get the last N bytes of the file with open(truncated_file_path, "w+") as fh: LogCollector._run_shell_command(["tail", "-c", str(_FILE_SIZE_LIMIT), file_path], stdout=fh) return truncated_file_path except OSError as e: _LOGGER.error("Failed to truncate large file: %s", ustr(e)) return None def _get_file_priority(self, file_entry): # The sooner the file appears in the must collect list, the bigger its priority. # Priority is higher the lower the number (0 is highest priority). try: return self._must_collect_files.index(file_entry) except ValueError: # Doesn't matter, file is not in the must collect list, assign a low priority return 999999999 def _get_priority_files_list(self, file_list): # Given a list of files to collect, determine if they show up in the must collect list and build a priority # queue. The queue will determine the order in which the files are collected, highest priority files first. priority_file_queue = [] for file_entry in file_list: priority = self._get_file_priority(file_entry) heappush(priority_file_queue, (priority, file_entry)) return priority_file_queue def _get_final_list_for_archive(self, priority_file_queue): # Given a priority queue of files to collect, add one by one while the archive size is under the size limit. # If a single file is over the file size limit, truncate it before adding it to the archive. _LOGGER.info("### Preparing list of files to add to archive ###") total_uncompressed_size = 0 final_files_to_collect = [] while priority_file_queue: try: file_path = heappop(priority_file_queue)[1] # (priority, file_path) file_size = min(os.path.getsize(file_path), _FILE_SIZE_LIMIT) if total_uncompressed_size + file_size > _UNCOMPRESSED_ARCHIVE_SIZE_LIMIT: _LOGGER.warning("Archive too big, done with adding files.") break if os.path.getsize(file_path) <= _FILE_SIZE_LIMIT: final_files_to_collect.append(file_path) total_uncompressed_size += file_size _LOGGER.info("Adding file %s, size %s b", file_path, file_size) else: truncated_file_path = self._truncate_large_file(file_path) if truncated_file_path: _LOGGER.info("Adding truncated file %s, size %s b", truncated_file_path, file_size) final_files_to_collect.append(truncated_file_path) total_uncompressed_size += file_size except IOError as e: if e.errno == 2: # [Errno 2] No such file or directory _LOGGER.warning("File %s does not exist, skipping collection for this file", file_path) msg = "Uncompressed archive size is {0} b".format(total_uncompressed_size) _LOGGER.info(msg) add_event(op=WALAEventOperation.LogCollection, message=msg) return final_files_to_collect, total_uncompressed_size def _create_list_of_files_to_collect(self): # The final list of files to be collected by zip is created in three steps: # 1) Parse given manifest file, expanding wildcards and keeping a list of files that exist on disk # 2) Assign those files a priority depending on whether they are in the must collect file list. # 3) In priority order, add files to the final list to be collected, until the size of the archive is under # the size limit. parsed_file_paths = self._process_manifest_file() prioritized_file_paths = self._get_priority_files_list(parsed_file_paths) files_to_collect, total_uncompressed_size = self._get_final_list_for_archive(prioritized_file_paths) return files_to_collect, total_uncompressed_size def collect_logs_and_get_archive(self): """ Public method that collects necessary log files in a compressed zip archive. :return: Returns the path of the collected compressed archive """ files_to_collect = [] total_uncompressed_size = 0 try: # Clear previous run's output and create base directories if they don't exist already. self._create_base_dirs() LogCollector._reset_file(OUTPUT_RESULTS_FILE_PATH) start_time = datetime.utcnow() _LOGGER.info("Starting log collection at %s", start_time.strftime("%Y-%m-%dT%H:%M:%SZ")) _LOGGER.info("Using log collection mode %s", "full" if self._is_full_mode else "normal") files_to_collect, total_uncompressed_size = self._create_list_of_files_to_collect() _LOGGER.info("### Creating compressed archive ###") compressed_archive = None def handle_add_file_to_archive_error(error_count, max_errors, file_to_collect, exception): error_count += 1 if error_count >= max_errors: raise Exception("Too many errors, giving up. Last error: {0}".format(ustr(exception))) else: _LOGGER.warning("Failed to add file %s to the archive: %s", file_to_collect, ustr(exception)) return error_count try: compressed_archive = zipfile.ZipFile(COMPRESSED_ARCHIVE_PATH, "w", compression=zipfile.ZIP_DEFLATED) max_errors = 8 error_count = 0 for file_to_collect in files_to_collect: try: archive_file_name = LogCollector._convert_file_name_to_archive_name(file_to_collect) compressed_archive.write(file_to_collect.encode("utf-8"), arcname=archive_file_name) except IOError as e: if e.errno == 2: # [Errno 2] No such file or directory _LOGGER.warning("File %s does not exist, skipping collection for this file", file_to_collect) else: error_count = handle_add_file_to_archive_error(error_count, max_errors, file_to_collect, e) except Exception as e: error_count = handle_add_file_to_archive_error(error_count, max_errors, file_to_collect, e) compressed_archive_size = os.path.getsize(COMPRESSED_ARCHIVE_PATH) _LOGGER.info("Successfully compressed files. Compressed archive size is %s b", compressed_archive_size) end_time = datetime.utcnow() duration = end_time - start_time elapsed_ms = int(((duration.days * 24 * 60 * 60 + duration.seconds) * 1000) + (duration.microseconds / 1000.0)) _LOGGER.info("Finishing log collection at %s", end_time.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")) _LOGGER.info("Elapsed time: %s ms", elapsed_ms) compressed_archive.write(OUTPUT_RESULTS_FILE_PATH.encode("utf-8"), arcname="results.txt") finally: if compressed_archive is not None: compressed_archive.close() return COMPRESSED_ARCHIVE_PATH, total_uncompressed_size except Exception as e: msg = "Failed to collect logs: {0}".format(ustr(e)) _LOGGER.error(msg) raise finally: self._remove_uncollected_truncated_files(files_to_collect)