def _read_event()

in smdebug/profiler/trace_event_file_parser.py [0:0]


    def _read_event(self, event, node_id=""):
        if "ph" not in event:
            return
        # if node-id in for of pid-algo-i , interchange to algo-i-pid so that we can have good sorted order
        # if we view this in timeline
        found_nodeids_parts = re.match("(.*)-(algo.*)", node_id) if node_id is not None else None
        if found_nodeids_parts is not None and len(found_nodeids_parts.groups()) == 2:
            node_id = found_nodeids_parts[2] + "-" + found_nodeids_parts[1]

        phase_type = event["ph"]
        if phase_type == "M":
            self._populate_process_info_for_metaevent(event)
            self._populate_thread_info_for_metaevent(event, node_id)
            self._populate_start_time(event)
        if phase_type == "X":
            # In nano seconds
            start_time = event["ts"] + self._start_timestamp
            # In nano seconds
            dur = event["dur"]
            name = event["name"]
            pid = phase_pid = event["pid"]  # this is phase pid
            phase_tid = event["tid"] if "tid" in event else 0

            phase_name = "Unknown"
            if phase_pid in self._processes:
                phase_name = self._processes[phase_pid].name

            if (
                "tid" in event
                and phase_pid in self._processes
                and event["tid"] in self._processes[phase_pid]._threads
            ):
                phase_tid = event["tid"]
            else:
                # we will generate unique tid which is hash of 0 + node_id
                phase_tid = self._populate_thread_info_for_metaevent(
                    event, node_id=node_id, phase_tid_default=phase_tid, phase_name=phase_name
                )

            event_args = event["args"] if "args" in event else None
            tid = phase_tid
            if event_args:
                # Tf Detailed metrics emits pid and thread_id
                if "pid" in event_args:
                    pid = event_args["pid"]
                if "thread_id" in event_args:
                    tid = event_args["thread_id"]

            t_event = TraceEvent(
                start_time,
                name,
                dur,
                phase_pid,
                phase_tid,
                event_args,
                node_id,
                phase_type,
                file_type=self.type,
                event_phase=phase_name,
                pid=pid,
                tid=tid,
                process_info=self._processes[phase_pid],
            )
            self._trace_events.append(t_event)
        # TODO ignoring B and E events for now. Need to check to handle it
        if phase_type == "B":
            pid = event["pid"]
            if pid not in self._pid_stacks:
                self._pid_stacks[pid] = []
            self._pid_stacks[pid].append(event)
        if phase_type == "E":
            pid = phase_pid = event["pid"]
            if pid not in self._pid_stacks:
                self.logger.info(
                    f"Did not find the 'B' type event in the pid {pid} . Skipping event: {event}"
                )
            else:
                b_event = self._pid_stacks[pid][-1]
                self._pid_stacks[pid].pop()
                start_time = b_event["ts"] + self._start_timestamp
                end_time = event["ts"] + self._start_timestamp
                duration = end_time - start_time
                if duration < 0:
                    self.logger.error(
                        f"Error in reading the events 'B' and 'E' or trace file is corrupt: pid = "
                        f"{pid}, start_time = {b_event['ts']} end_time = {event['ts']} name = "
                        f"{b_event['name']}"
                    )
                    return

                name = b_event["name"]
                event_args = event["args"] if "args" in event else None
                phase_tid = tid = b_event["tid"] if "tid" in event else "0"
                phase_name = "Unknown"
                if phase_pid in self._processes:
                    phase_name = self._processes[phase_pid].name

                if (
                    "tid" in event
                    and phase_pid in self._processes
                    and tid in self._processes[phase_pid]._threads
                ):
                    phase_tid = self._processes[phase_pid]._threads[tid]
                else:
                    # we will generate unique tid which is hash of 0 + node_id
                    phase_tid = self._populate_thread_info_for_metaevent(
                        event, node_id=node_id, phase_tid_default=phase_tid, phase_name=phase_name
                    )
                tid = phase_tid
                if event_args:
                    # Tf Detailed metrics emits pid and thread_id
                    # get actual pid of process
                    # get actual thread id of processes. depending on file type actual pid and tid may be into args
                    if "pid" in event_args:
                        pid = event_args["pid"]
                    if "thread_id" in event_args:
                        tid = event_args["thread_id"]

                t_event = TraceEvent(
                    start_time,
                    name,
                    duration,
                    phase_pid,
                    phase_tid,
                    event_args,
                    node_id,
                    "X",
                    file_type=self.type,
                    event_phase=phase_name,
                    pid=pid,
                    tid=tid,
                    process_info=self._processes[phase_pid],
                )
                self._trace_events.append(t_event)