utils/tb_log_parser.py (194 lines of code) (raw):
#!/usr/bin/env python3
import calendar
import os
import pickle
import time
import click
# This has a dependency conflict with protobuf versions when poetry installing.
# To run this script run `poetry run pip install tensorboardX`.
import tensorboardX as tb # type: ignore
def get_wall_time(date_str, time_str):
return calendar.timegm(time.strptime(date_str + " " + time_str, "%Y-%m-%d %H:%M:%S"))
class JobMonitor:
def __init__(self, job_path, prefix):
self.job_path = job_path
self.tb_logdir = job_path + "/tb"
self.train_log = os.path.join(job_path, prefix, "train.log")
self.prefix = prefix
self.last_update_time = 0
self.last_update_line = -1
self.gpus = 0
self.sen_last = 0
self.last_wall_time = None
self.gaps = 0
self.avg_gaps = 0
self.gaps_num = 0
self.avg_status = {}
self.pickle_file = self.tb_logdir + "/monitor-status.pickle"
if os.path.exists(self.pickle_file):
with open(self.pickle_file, "rb") as f:
(
self.last_update_time,
self.last_update_line,
self.gpus,
self.sen_last,
self.avg_status,
self.last_wall_time,
self.gaps,
self.avg_gaps,
self.gaps_num,
) = pickle.load(f)
self.writer = tb.SummaryWriter(self.tb_logdir)
def wall_time_minus_gaps(self, wall_time):
if self.last_wall_time is None:
self.last_wall_time = wall_time
return wall_time
thisgap = wall_time - self.last_wall_time
if thisgap > 1200: # 20 minutes
self.gaps += thisgap - self.avg_gaps
else:
self.gaps_num += 1
self.avg_gaps = ((self.avg_gaps * (self.gaps_num - 1)) + thisgap) / self.gaps_num
self.last_wall_time = wall_time
return wall_time - self.gaps
def parse_train(self, line):
strdate, strtime, *_ = line.split()
wall_time = get_wall_time(strdate[1:], strtime[:-1])
real_wall_time = wall_time
wall_time = self.wall_time_minus_gaps(wall_time)
words = line.split()
strdate, strtime, _ep, ep, _, _up, up, *rest = words
ep = int(ep)
up = int(up)
self.writer.add_scalar("train/epoch", ep, up, wall_time)
self.writer.add_scalar("train/wall-clock", real_wall_time, up, real_wall_time)
_, _sen, sen, _, _cost, cost, *rest = rest
sen = sen.replace(",", "")
sen = int(sen)
cost = float(cost)
self.writer.add_scalar("train/sentences", sen, up, wall_time)
self.writer.add_scalar("train/sentences-diff", sen - self.sen_last, up, wall_time)
self.sen_last = sen
self.writer.add_scalar("train/cost", cost, up, wall_time)
_, _time, t, _, speed, _words_per_sec, *rest = rest
t = float(t[:-1])
self.writer.add_scalar("train/time[sec]", t, up, wall_time)
speed = float(speed)
self.writer.add_scalar("train/speed[words per sec]", speed, up, wall_time)
# lr is optional
if rest:
_, _lr, lr = rest
lr = float(lr)
self.writer.add_scalar("train/learning_rate", lr, up, wall_time)
self.writer.add_scalar("train/gpus", self.gpus, up, wall_time)
return up
def parse_valid(self, line):
strdate, strtime, *_ = line.split()
wall_time = get_wall_time(strdate[1:], strtime[:-1])
wall_time = self.wall_time_minus_gaps(wall_time)
# do not parse lines like
# [2021-04-16 22:29:19] [valid] [valid] First sentence\'s tokens as scored:
if line.find("Ep.") == -1 and line.find("Up.") == -1:
return
_, _, _, _ep, ep, _, _up, up, _, metric, _, value, _, _stalled, x, *_times = line.split()
up = int(up)
value = float(value)
self.writer.add_scalar("valid/" + metric, value, up, wall_time)
if x == "best":
x = 0
else:
x = int(x)
self.writer.add_scalar("valid/" + metric + "_stalled", x, up, wall_time)
def save_last_update(self):
t = os.path.getmtime(self.train_log)
self.last_update_time = t
with open(self.pickle_file, "wb") as f:
pickle.dump(
(
self.last_update_time,
self.last_update_line,
self.gpus,
self.sen_last,
self.avg_status,
self.last_wall_time,
self.gaps,
self.avg_gaps,
self.gaps_num,
),
f,
)
def update_needed(self):
t = os.path.getmtime(self.train_log)
if t > self.last_update_time:
print(" current modification time:", t, "last:", self.last_update_time)
return True
return False
def update_loop(self):
self.update_all_avg()
if not self.update_needed():
# print(" no update needed")
return
i = 0
with open(self.train_log, "r") as f:
for i, line in enumerate(f):
if i <= self.last_update_line:
continue
# print("processing line ",i, self.train_log)
if "--devices" in line:
self.gpus = 0
words = line.split()
for w in words[words.index("--devices") + 1 :]:
try:
int(w)
except: # noqa: E722
break
self.gpus += 1
elif "] Ep. " in line and "[valid]" not in line:
self.parse_train(line)
elif "[valid]" in line:
self.parse_valid(line)
self.last_update_line = i
print("last line id:", self.last_update_line)
self.save_last_update()
# This assumes some files like "avg-8.log" exist in model directory. If not, this does nothing.
def update_all_avg(self):
for fn in os.listdir(os.path.join(self.job_path, self.prefix)):
if not fn.startswith("avg-"):
continue
if not fn.endswith(".log"):
continue
name = fn.replace(".log", "")
with open(os.path.join(self.job_path, self.prefix, fn)) as f:
if name not in self.avg_status:
self.avg_status[name] = -1
for line in f:
label, *score = line.split()
try:
step = int(label.split("-")[2])
except: # noqa: E722
continue
if step <= self.avg_status[name]:
continue
if not score:
continue
score = float(score[0])
self.writer.add_scalar("valid-avg/" + name + "_bleu", score, step)
self.avg_status[name] = step
@click.command()
@click.option("--dir")
@click.option("--prefix", default="")
def run(dir, prefix):
monitors = {}
while True:
with open("tb-monitored-jobs", "r") as f:
monitored = set()
# create new monitors
for line_raw in f:
line = line_raw.strip()
if line not in monitors:
path = os.path.join(line, prefix, "train.log")
if os.path.exists(path):
m = JobMonitor(line, prefix)
monitors[line] = m
else:
print("path %s does not exist, skipping" % (path))
monitored.add(line)
# delete unregistered monitors
for k in list(monitors.keys()):
if k not in monitored:
del monitors[k]
# update all monitors
for j, m in monitors.items():
print("update loop", j)
m.update_loop()
# break
time.sleep(5)
if __name__ == "__main__":
run()