in pyscripts/docker_log_processor.py [0:0]
def process_static_log(self, static_filenames, filter_filenames):
"""
Static logs in args - set them up for processing.
Static log(s) specified.
static_filenames
Optional filter_filename
Path to JSON filter file
read all log files and format each line
sort and display to stdout
"""
import os
import json
import pathlib
split_char = u"\u2588"
loglines = []
max_name_len = 0
filter_list = ""
pytest_owner = ""
if filter_filenames:
filter_filename = os.path.abspath(filter_filenames[0])
"""
filter.json should have a format like this:
{
"filters":
[
"Getting next batch",
"Obtained next batch"
]
}
"""
try:
filter_json = open(filter_filename, "r", encoding="utf8").read()
if filter_json:
json_data = json.loads(filter_json)
filter_list = json_data["filters"]
except Exception:
self.write_err("Exception processing JSON file: " + filter_filename)
traceback.print_exc()
# find the max_name_len of every staticfile filename basename
for static_filename in static_filenames:
if static_filename:
base_filename = os.path.basename(static_filename[0])
name_len = len(base_filename)
if name_len > max_name_len:
max_name_len = name_len
# read and process every static file
for static_filename in static_filenames:
if static_filename:
static_filename = static_filename[0]
module_name = os.path.basename(static_filename)
print("Getting log from file: " + static_filename)
# Pad the filename so that each is the same length
for _ in range(len(module_name), max_name_len):
module_name += " "
try:
read_file = (
open(static_filename, encoding="utf8").read().split("\n")
)
except Exception as e:
self.write_err("Exception opening LOG file: " + static_filename)
self.write_err(e)
return
# Get and filter each line
for log_line in read_file:
ok_to_log = True
if log_line:
if "PYTEST" in log_line:
if not pytest_owner:
pytest_owner = module_name
else:
if pytest_owner != module_name:
ok_to_log = False
if ok_to_log:
for filter in filter_list:
if filter in log_line:
ok_to_log = False
if ok_to_log:
# Made it past filters and PyTest, so Log the line
log_line_parts = log_line.split("Z ")
if log_line_parts:
log_data = ""
num_parts = len(log_line_parts)
# Handle case where more than one timestamp
if num_parts > 2:
for part in range(1, num_parts):
log_data += log_line_parts[part] + " "
else:
if num_parts == 2:
log_data = log_line_parts[1]
if num_parts >= 2:
try:
log_time = DockerLogProcessor.format_date_and_time(
log_line_parts[0], "%Y-%m-%d %H:%M:%S.%f"
)
log_line_object = LogLineObject(
log_time, module_name, log_data
)
loglines.append(log_line_object)
except Exception:
print(
"INVALID_TIMESTAMP({}):{}".format(
module_name, log_line
)
)
else:
print(
"INVALID_LINE({}):{}".format(
module_name, log_line
)
)
# Sort the merged static file lines by timestamp
loglines.sort(key=lambda x: x.timestamp)
last_timestamp = datetime.now() + timedelta(days=-364)
line_count = 0
# display the results to stdout
for log_line in loglines:
logline_timestamp = log_line.timestamp
if (
"HORTON: Entering function" in log_line.log_data
or "HORTON: Exiting function" in log_line.log_data
):
date_delta = logline_timestamp.isoformat(timespec="microseconds")
else:
date_delta = self.get_timestamp_delta(
logline_timestamp.isoformat(timespec="microseconds"),
last_timestamp.isoformat(timespec="microseconds"),
line_count,
)
line_count += 1
out_line = (
log_line.module_name
+ " : "
+ date_delta
+ " "
+ split_char
+ " "
+ log_line.log_data
)
last_timestamp = logline_timestamp
try:
print(out_line)
except Exception:
print("".join([i if ord(i) < 128 else "#" for i in out_line]))