in src/datatrove/executor/base.py [0:0]
def _run_for_rank(self, rank: int, local_rank: int = 0) -> PipelineStats:
"""
Main executor's method. Sets up logging, pipes data from each pipeline step to the next, saves statistics
and marks tasks as completed.
Args:
rank: the rank that we want to run the pipeline for
local_rank: at the moment this is only used for logging.
Any task with local_rank != 0 will not print logs to console.
Returns: the stats for this task
"""
if self.is_rank_completed(rank):
logger.info(f"Skipping {rank=} as it has already been completed.")
return PipelineStats()
logfile = add_task_logger(self.logging_dir, rank, local_rank)
log_pipeline(self.pipeline)
if self.randomize_start_duration > 0:
time.sleep(random.randint(0, self.randomize_start_duration))
try:
# pipe data from one step to the next
pipelined_data = None
for pipeline_step in self.pipeline:
if callable(pipeline_step):
pipelined_data = pipeline_step(pipelined_data, rank, self.world_size)
elif isinstance(pipeline_step, Sequence) and not isinstance(pipeline_step, str):
pipelined_data = pipeline_step
else:
raise ValueError
if pipelined_data:
deque(pipelined_data, maxlen=0)
logger.success(f"Processing done for {rank=}")
# stats
stats = PipelineStats(self.pipeline)
with self.logging_dir.open(f"stats/{rank:05d}.json", "w") as f:
stats.save_to_disk(f)
logger.info(stats.get_repr(f"Task {rank}"))
# completed
self.mark_rank_as_completed(rank)
except Exception as e:
logger.exception(e)
raise e
finally:
close_task_logger(logfile)
return stats