pyscripts/docker_log_processor.py (266 lines of code) (raw):

# Copyright (c) Microsoft. All rights reserved. # Licensed under the MIT license. See LICENSE file in the project root for # full license information. # # filename: docker_log_processor.py # author: v-greach@microsoft.com # created: 01/29/2019 # Rev: 03/05/2019 A from multiprocessing import Process, Queue, Event from threading import Thread from datetime import datetime, timedelta import traceback import docker import time import argparse import sys class DockerLogProcessor: def __init__(self, args): # Parse args parser = argparse.ArgumentParser(description="Docker Log Processor") group = parser.add_mutually_exclusive_group(required=True) group.add_argument( "-staticfile", action="append", nargs="+", help="filename to read from" ) group.add_argument( "-modulename", action="append", nargs="+", help="docker modulename to read from", ) parser.add_argument("-filterfile", nargs=1, help="filename of json filters") arguments = parser.parse_args(args) if arguments.staticfile: self.process_static_log(arguments.staticfile, arguments.filterfile) else: self.queue = Queue() self.logger_thread = Thread(target=self.process_queue) self.logger_thread.start() self.watcher_processes = [] for container_name in arguments.modulename: print("Getting Log for: " + container_name) new_process = Process( target=self.get_log_from_container, args=(container_name, self.queue), ) new_process.start() self.watcher_processes.append(new_process) @classmethod def format_date_and_time(self, date_in="", time_format="%Y-%m-%d %H:%M:%S.%f"): """ Formats a string into a datetime type. date_in comes in, if it's empty, set it to NOW, then using the format, convert the string to datetime type. Parameters ---------- date_in : string String to convert - can be null time_format : string format of string for datetime conversion Returns ------- datetime converted input string or NOW() if date_in is empty """ date_out = "" if not date_in: date_out = datetime.strftime(datetime.now(), time_format) return date_out date_in = date_in.replace("T", " ") if len(date_in) > 26: date_in = date_in[:26] date_out = datetime.strptime(date_in, time_format) return date_out @staticmethod def write_err(msg): """ write a string to stdout and stderr. """ print(msg, file=sys.stderr) print(msg) @staticmethod def get_log_from_container(container_name, queue): """ Gets log info from the Docker container then converts DateTime and puts each line in the queue. Parameters ---------- container_name : string Name of the Docker container queue : object queue to stuff the log object in """ client = docker.from_env() container = client.containers.get(container_name) for log_line in container.logs( stream=True, tail=0, follow=True, timestamps=True ): try: log_line = log_line.decode("utf8").strip() log_line_parts = log_line.split("Z ") if log_line_parts: log_data = "" num_parts = len(log_line_parts) # Handle case where more than one timestamp if num_parts > 2: for part in range(1, num_parts): log_data += log_line_parts[part] + " " else: log_data = log_line_parts[1] log_line_object = LogLineObject( DockerLogProcessor.format_date_and_time(log_line_parts[0]), container_name, log_data, ) queue.put(log_line_object) except Exception as e: DockerLogProcessor.write_err( "Exception getting container log_line from: " + container_name ) DockerLogProcessor.write_err(e) def split(self, string, delimiters): """ Split a string with multiple delimiters. """ import re regexPattern = "|".join(map(re.escape, delimiters)) return re.split(regexPattern, string) def get_timestamp_delta(self, date_one, date_two, line_count=0, line_mod=100): """ Diff date_one and date_two then format string for readability. Delta of the strings are converted by fields. line_count can be used to print a full timestamp every line_mod (%) lines """ if line_mod != 0 and line_count % line_mod == 0: return date_one time_delta_str = "" delimiters = (".", "-", " ", ":") field_count = 0 all_fields_one = self.split(date_one, delimiters) all_fields_two = self.split(date_two, delimiters) for field1 in all_fields_one: if field1 == all_fields_two[field_count]: for _ in field1: time_delta_str += " " else: time_delta_str += all_fields_one[field_count] if field_count < 2: time_delta_str += "-" elif field_count == 2: time_delta_str += " " elif field_count > 2 and field_count < 5: time_delta_str += ":" elif field_count == 5: time_delta_str += "." field_count += 1 return time_delta_str def process_static_log(self, static_filenames, filter_filenames): """ Static logs in args - set them up for processing. Static log(s) specified. static_filenames Optional filter_filename Path to JSON filter file read all log files and format each line sort and display to stdout """ import os import json import pathlib split_char = u"\u2588" loglines = [] max_name_len = 0 filter_list = "" pytest_owner = "" if filter_filenames: filter_filename = os.path.abspath(filter_filenames[0]) """ filter.json should have a format like this: { "filters": [ "Getting next batch", "Obtained next batch" ] } """ try: filter_json = open(filter_filename, "r", encoding="utf8").read() if filter_json: json_data = json.loads(filter_json) filter_list = json_data["filters"] except Exception: self.write_err("Exception processing JSON file: " + filter_filename) traceback.print_exc() # find the max_name_len of every staticfile filename basename for static_filename in static_filenames: if static_filename: base_filename = os.path.basename(static_filename[0]) name_len = len(base_filename) if name_len > max_name_len: max_name_len = name_len # read and process every static file for static_filename in static_filenames: if static_filename: static_filename = static_filename[0] module_name = os.path.basename(static_filename) print("Getting log from file: " + static_filename) # Pad the filename so that each is the same length for _ in range(len(module_name), max_name_len): module_name += " " try: read_file = ( open(static_filename, encoding="utf8").read().split("\n") ) except Exception as e: self.write_err("Exception opening LOG file: " + static_filename) self.write_err(e) return # Get and filter each line for log_line in read_file: ok_to_log = True if log_line: if "PYTEST" in log_line: if not pytest_owner: pytest_owner = module_name else: if pytest_owner != module_name: ok_to_log = False if ok_to_log: for filter in filter_list: if filter in log_line: ok_to_log = False if ok_to_log: # Made it past filters and PyTest, so Log the line log_line_parts = log_line.split("Z ") if log_line_parts: log_data = "" num_parts = len(log_line_parts) # Handle case where more than one timestamp if num_parts > 2: for part in range(1, num_parts): log_data += log_line_parts[part] + " " else: if num_parts == 2: log_data = log_line_parts[1] if num_parts >= 2: try: log_time = DockerLogProcessor.format_date_and_time( log_line_parts[0], "%Y-%m-%d %H:%M:%S.%f" ) log_line_object = LogLineObject( log_time, module_name, log_data ) loglines.append(log_line_object) except Exception: print( "INVALID_TIMESTAMP({}):{}".format( module_name, log_line ) ) else: print( "INVALID_LINE({}):{}".format( module_name, log_line ) ) # Sort the merged static file lines by timestamp loglines.sort(key=lambda x: x.timestamp) last_timestamp = datetime.now() + timedelta(days=-364) line_count = 0 # display the results to stdout for log_line in loglines: logline_timestamp = log_line.timestamp if ( "HORTON: Entering function" in log_line.log_data or "HORTON: Exiting function" in log_line.log_data ): date_delta = logline_timestamp.isoformat(timespec="microseconds") else: date_delta = self.get_timestamp_delta( logline_timestamp.isoformat(timespec="microseconds"), last_timestamp.isoformat(timespec="microseconds"), line_count, ) line_count += 1 out_line = ( log_line.module_name + " : " + date_delta + " " + split_char + " " + log_line.log_data ) last_timestamp = logline_timestamp try: print(out_line) except Exception: print("".join([i if ord(i) < 128 else "#" for i in out_line])) def process_queue(self): """ Process the line objects in the queue, and print as formatted. """ last_timestamp = datetime.now() + timedelta(days=-364) line_count = 0 split_char = u"\u2588" while True: log_line = self.queue.get() logline_timestamp = log_line.timestamp if ( "HORTON: Entering function" in log_line.log_data or "HORTON: Exiting function" in log_line.log_data ): date_delta = logline_timestamp.isoformat(timespec="microseconds") else: date_delta = self.get_timestamp_delta( logline_timestamp.isoformat(timespec="microseconds"), last_timestamp.isoformat(timespec="microseconds"), line_count, ) line_count += 1 last_timestamp = logline_timestamp out_line = ( log_line.module_name + " : " + date_delta + " " + split_char + " " + log_line.log_data ) try: print(out_line) except Exception: print("".join([i if ord(i) < 128 else "#" for i in out_line])) class LogLineObject: def __init__(self, timestamp, module_name="", log_data=""): self.timestamp = timestamp self.module_name = module_name self.log_data = log_data if __name__ == "__main__": log_processor = DockerLogProcessor(sys.argv[1:])