def write_event()

in smdebug/core/tfevent/timeline_file_writer.py [0:0]


    def write_event(self, record):
        """Appends trace event to the file."""
        # Check if event is of type TimelineRecord.
        if not isinstance(record, TimelineRecord):
            raise TypeError("expected a TimelineRecord, " " but got %s" % type(record))
        self._num_outstanding_events += 1

        """
        Rotation policy:
        Close file if file size exceeds $ENV_MAX_FILE_SIZE or folder was created more than
        $ENV_CLOSE_FILE_INTERVAL time duration.
        """
        end_time_for_event_in_us = record.event_end_ts_micros

        # check if any of the rotation policies have been satisfied. close the existing
        # trace file and open a new one
        # policy 1: if file size exceeds specified max_size
        # policy 2: if same file has been written to for close_interval time
        # policy 3: if a write is being made in the next hour, create a new directory
        if self._writer and self._should_rotate_now(end_time_for_event_in_us):
            self.close()

        #  if file has not been created yet, create now
        if not self._writer:
            file_opened = self.open(path=self.name(), cur_event_end_time=end_time_for_event_in_us)
            if not file_opened:
                file_open_fail_threshold = (
                    self._profiler_config_parser.config.trace_file.file_open_fail_threshold
                )
                if self.continuous_fail_count >= file_open_fail_threshold:
                    logger.warning(
                        "Encountered {} number of continuous failures while trying to open the file. "
                        "Marking the writer unhealthy. All future events will be dropped.".format(
                            str(file_open_fail_threshold)
                        )
                    )
                    self._healthy = False
                return

            # First writing a metadata event
        if self.is_first:
            args = {"start_time_since_epoch_in_micros": record.base_start_time}
            json_dict = {"name": "process_name", "ph": "M", "pid": 0, "args": args}
            self._writer.write(json.dumps(json_dict) + ",\n")

            args = {"sort_index": 0}
            json_dict = {"name": "process_sort_index", "ph": "M", "pid": 0, "args": args}
            self._writer.write(json.dumps(json_dict) + ",\n")
            self.is_first = False

        if self.tensor_table[record.training_phase] == 0:
            # Get the tensor_idx from master table if not create one and append it to master table.
            if record.training_phase in self.training_phase_to_pid:
                tensor_idx = self.training_phase_to_pid[record.training_phase]
            else:
                tensor_idx = len(self.training_phase_to_pid)
                self.training_phase_to_pid[record.training_phase] = tensor_idx

            self.tensor_table[record.training_phase] = tensor_idx

            # Instant events don't have a training phase
            if record.phase != "i":
                args = {"name": record.training_phase}
                json_dict = {"name": "process_name", "ph": "M", "pid": tensor_idx, "args": args}
                self._writer.write(json.dumps(json_dict) + ",\n")

                args = {"sort_index": tensor_idx}
                json_dict = {
                    "name": "process_sort_index",
                    "ph": "M",
                    "pid": tensor_idx,
                    "args": args,
                }
                self._writer.write(json.dumps(json_dict) + ",\n")

        record.pid = self.tensor_table[record.training_phase]

        # write the trace event record
        position_and_length_of_record = self._writer.write(record.to_json() + ",\n")
        self.flush()
        if record.event_end_ts_micros > self.last_event_end_time_in_us:
            self.last_event_end_time_in_us = record.event_end_ts_micros
        return position_and_length_of_record