in tb_plugin/torch_tb_profiler/profiler/loader.py [0:0]
def load(self):
workers = []
spans_by_workers = defaultdict(list)
for path in io.listdir(self.run_dir):
if io.isdir(io.join(self.run_dir, path)):
continue
match = consts.WORKER_PATTERN.match(path)
if not match:
continue
worker = match.group(1)
span = match.group(2)
if span is not None:
# remove the starting dot (.)
span = span[1:]
bisect.insort(spans_by_workers[worker], span)
workers.append((worker, span, path))
span_index_map = {}
for worker, span_array in spans_by_workers.items():
for i, span in enumerate(span_array, 1):
span_index_map[(worker, span)] = i
for worker, span, path in workers:
# convert the span timestamp to the index.
span_index = None if span is None else span_index_map[(worker, span)]
p = Process(target=self._process_data, args=(worker, span_index, path))
p.start()
logger.info("started all processing")
distributed_run = Run(self.run_name, self.run_dir)
run = Run(self.run_name, self.run_dir)
num_items = len(workers)
while num_items > 0:
item = self.queue.get()
num_items -= 1
r, d = item
if r or d:
logger.debug("Loaded profile via mp.Queue")
if r is not None:
run.add_profile(r)
if d is not None:
distributed_run.add_profile(d)
distributed_profiles = self._process_spans(distributed_run)
for d in distributed_profiles:
if d is not None:
run.add_profile(d)
# for no daemon process, no need to join them since it will automatically join
return run