in smdebug/core/tfevent/timeline_file_writer.py [0:0]
def write_event(self, record):
"""Appends trace event to the file."""
# Check if event is of type TimelineRecord.
if not isinstance(record, TimelineRecord):
raise TypeError("expected a TimelineRecord, " " but got %s" % type(record))
self._num_outstanding_events += 1
"""
Rotation policy:
Close file if file size exceeds $ENV_MAX_FILE_SIZE or folder was created more than
$ENV_CLOSE_FILE_INTERVAL time duration.
"""
end_time_for_event_in_us = record.event_end_ts_micros
# check if any of the rotation policies have been satisfied. close the existing
# trace file and open a new one
# policy 1: if file size exceeds specified max_size
# policy 2: if same file has been written to for close_interval time
# policy 3: if a write is being made in the next hour, create a new directory
if self._writer and self._should_rotate_now(end_time_for_event_in_us):
self.close()
# if file has not been created yet, create now
if not self._writer:
file_opened = self.open(path=self.name(), cur_event_end_time=end_time_for_event_in_us)
if not file_opened:
file_open_fail_threshold = (
self._profiler_config_parser.config.trace_file.file_open_fail_threshold
)
if self.continuous_fail_count >= file_open_fail_threshold:
logger.warning(
"Encountered {} number of continuous failures while trying to open the file. "
"Marking the writer unhealthy. All future events will be dropped.".format(
str(file_open_fail_threshold)
)
)
self._healthy = False
return
# First writing a metadata event
if self.is_first:
args = {"start_time_since_epoch_in_micros": record.base_start_time}
json_dict = {"name": "process_name", "ph": "M", "pid": 0, "args": args}
self._writer.write(json.dumps(json_dict) + ",\n")
args = {"sort_index": 0}
json_dict = {"name": "process_sort_index", "ph": "M", "pid": 0, "args": args}
self._writer.write(json.dumps(json_dict) + ",\n")
self.is_first = False
if self.tensor_table[record.training_phase] == 0:
# Get the tensor_idx from master table if not create one and append it to master table.
if record.training_phase in self.training_phase_to_pid:
tensor_idx = self.training_phase_to_pid[record.training_phase]
else:
tensor_idx = len(self.training_phase_to_pid)
self.training_phase_to_pid[record.training_phase] = tensor_idx
self.tensor_table[record.training_phase] = tensor_idx
# Instant events don't have a training phase
if record.phase != "i":
args = {"name": record.training_phase}
json_dict = {"name": "process_name", "ph": "M", "pid": tensor_idx, "args": args}
self._writer.write(json.dumps(json_dict) + ",\n")
args = {"sort_index": tensor_idx}
json_dict = {
"name": "process_sort_index",
"ph": "M",
"pid": tensor_idx,
"args": args,
}
self._writer.write(json.dumps(json_dict) + ",\n")
record.pid = self.tensor_table[record.training_phase]
# write the trace event record
position_and_length_of_record = self._writer.write(record.to_json() + ",\n")
self.flush()
if record.event_end_ts_micros > self.last_event_end_time_in_us:
self.last_event_end_time_in_us = record.event_end_ts_micros
return position_and_length_of_record