in smdebug/profiler/trace_event_file_parser.py [0:0]
def _read_event(self, event, node_id=""):
if "ph" not in event:
return
# if node-id in for of pid-algo-i , interchange to algo-i-pid so that we can have good sorted order
# if we view this in timeline
found_nodeids_parts = re.match("(.*)-(algo.*)", node_id) if node_id is not None else None
if found_nodeids_parts is not None and len(found_nodeids_parts.groups()) == 2:
node_id = found_nodeids_parts[2] + "-" + found_nodeids_parts[1]
phase_type = event["ph"]
if phase_type == "M":
self._populate_process_info_for_metaevent(event)
self._populate_thread_info_for_metaevent(event, node_id)
self._populate_start_time(event)
if phase_type == "X":
# In nano seconds
start_time = event["ts"] + self._start_timestamp
# In nano seconds
dur = event["dur"]
name = event["name"]
pid = phase_pid = event["pid"] # this is phase pid
phase_tid = event["tid"] if "tid" in event else 0
phase_name = "Unknown"
if phase_pid in self._processes:
phase_name = self._processes[phase_pid].name
if (
"tid" in event
and phase_pid in self._processes
and event["tid"] in self._processes[phase_pid]._threads
):
phase_tid = event["tid"]
else:
# we will generate unique tid which is hash of 0 + node_id
phase_tid = self._populate_thread_info_for_metaevent(
event, node_id=node_id, phase_tid_default=phase_tid, phase_name=phase_name
)
event_args = event["args"] if "args" in event else None
tid = phase_tid
if event_args:
# Tf Detailed metrics emits pid and thread_id
if "pid" in event_args:
pid = event_args["pid"]
if "thread_id" in event_args:
tid = event_args["thread_id"]
t_event = TraceEvent(
start_time,
name,
dur,
phase_pid,
phase_tid,
event_args,
node_id,
phase_type,
file_type=self.type,
event_phase=phase_name,
pid=pid,
tid=tid,
process_info=self._processes[phase_pid],
)
self._trace_events.append(t_event)
# TODO ignoring B and E events for now. Need to check to handle it
if phase_type == "B":
pid = event["pid"]
if pid not in self._pid_stacks:
self._pid_stacks[pid] = []
self._pid_stacks[pid].append(event)
if phase_type == "E":
pid = phase_pid = event["pid"]
if pid not in self._pid_stacks:
self.logger.info(
f"Did not find the 'B' type event in the pid {pid} . Skipping event: {event}"
)
else:
b_event = self._pid_stacks[pid][-1]
self._pid_stacks[pid].pop()
start_time = b_event["ts"] + self._start_timestamp
end_time = event["ts"] + self._start_timestamp
duration = end_time - start_time
if duration < 0:
self.logger.error(
f"Error in reading the events 'B' and 'E' or trace file is corrupt: pid = "
f"{pid}, start_time = {b_event['ts']} end_time = {event['ts']} name = "
f"{b_event['name']}"
)
return
name = b_event["name"]
event_args = event["args"] if "args" in event else None
phase_tid = tid = b_event["tid"] if "tid" in event else "0"
phase_name = "Unknown"
if phase_pid in self._processes:
phase_name = self._processes[phase_pid].name
if (
"tid" in event
and phase_pid in self._processes
and tid in self._processes[phase_pid]._threads
):
phase_tid = self._processes[phase_pid]._threads[tid]
else:
# we will generate unique tid which is hash of 0 + node_id
phase_tid = self._populate_thread_info_for_metaevent(
event, node_id=node_id, phase_tid_default=phase_tid, phase_name=phase_name
)
tid = phase_tid
if event_args:
# Tf Detailed metrics emits pid and thread_id
# get actual pid of process
# get actual thread id of processes. depending on file type actual pid and tid may be into args
if "pid" in event_args:
pid = event_args["pid"]
if "thread_id" in event_args:
tid = event_args["thread_id"]
t_event = TraceEvent(
start_time,
name,
duration,
phase_pid,
phase_tid,
event_args,
node_id,
"X",
file_type=self.type,
event_phase=phase_name,
pid=pid,
tid=tid,
process_info=self._processes[phase_pid],
)
self._trace_events.append(t_event)