in smdebug/profiler/hvd_trace_file_rotation.py [0:0]
def _read_write_loop(self):
"""
Reader thread to constantly read the large Horovod trace file and write
events with Timeline File Writer
"""
# Let the loop continuously run until stop event is set on smdebug hook cleanup
while True:
try:
with open(self.hvd_file) as json_data:
# set the file pointer to the position up to which the reader
# thread has read.
json_data.seek(self.file_seek_pos)
# for every line read, verify that it is a valid JSON.
for line in json_data:
try:
event = (
json.loads(line[:-2])
if line.endswith(",\n")
else json.loads(line[:-1])
)
# the timestamp of the 1st event is considered as base timestamp
if self._base_timestamp_in_us is None:
if "ts" in event:
timestamp = event["ts"]
# find out the base timestamp
# this is the base timestamp that will be used by timeline file writer as well.
self._base_timestamp_in_us = self._convert_monotonic_to_epoch_time(
timestamp
)
# Hvd base timestamp might be earlier than timeline writer's base start time.
# Update timeline writer and the writer thread to avoid negative relative timestamp
# in the rotated files.
self.tl_writer._update_base_start_time(
self._base_timestamp_in_us
)
# the name mentioned in metadata events are used as training_phase in TimelineRecord
# make a note of this name. Timeline File Writer will take care of writing
# metadata event for each event
if event["ph"] == "M":
if "name" in event["args"]:
self.training_phase[event["pid"]] = event["args"]["name"]
else:
# parse the event JSON string
op_name, timestamp_in_secs, duration, pid, args = self._parse_trace_event(
event
)
# write complete, duration, and instant events
self.tl_writer.write_trace_events(
training_phase=self.training_phase[pid],
op_name=op_name,
phase=event["ph"],
timestamp=timestamp_in_secs,
duration=duration,
**args,
)
except ValueError:
# invalid JSON string, skip
pass
# update file seek position for the next read
self.file_seek_pos = max(self.file_seek_pos, json_data.tell())
# stop event has been set, exiting the thread
if self._stopper.isSet():
break
except (OSError, IOError) as e:
# unable to open timeline file, try again
pass
time.sleep(15)