in src/datatrove/executor/ray.py [0:0]
def _run_for_rank(self, rank: int, local_rank: int = 0) -> PipelineStats:
"""
Main executor's method. Sets up logging, pipes data from each pipeline step to the next, saves statistics
and marks tasks as completed.
Args:
rank: the rank that we want to run the pipeline for
local_rank: at the moment this is only used for logging.
Any task with local_rank != 0 will not print logs to console.
Returns: the stats for this task
"""
import tempfile
if self.is_rank_completed(rank):
logger.info(f"Skipping {rank=} as it has already been completed.")
return PipelineStats()
# We log only locally and upload logs to logging_dir after the pipeline is finished
ray_logs_dir = get_datafolder(f"{tempfile.gettempdir()}/ray_logs")
logfile = add_task_logger(ray_logs_dir, rank, local_rank)
log_pipeline(self.pipeline)
try:
# pipe data from one step to the next
pipelined_data = None
for pipeline_step in self.pipeline:
if callable(pipeline_step):
pipelined_data = pipeline_step(pipelined_data, rank, self.world_size)
elif isinstance(pipeline_step, Sequence) and not isinstance(pipeline_step, str):
pipelined_data = pipeline_step
else:
raise ValueError
if pipelined_data:
deque(pipelined_data, maxlen=0)
logger.success(f"Processing done for {rank=}")
# stats
stats = PipelineStats(self.pipeline)
with self.logging_dir.open(f"stats/{rank:05d}.json", "w") as f:
stats.save_to_disk(f)
logger.info(stats.get_repr(f"Task {rank}"))
# completed
self.mark_rank_as_completed(rank)
except Exception as e:
logger.exception(e)
raise e
finally:
close_task_logger(logfile)
# Copy logs from local dir to logging_dir
with (
ray_logs_dir.open(f"logs/task_{rank:05d}.log", "r") as f,
self.logging_dir.open(f"logs/task_{rank:05d}.log", "w") as f_out,
):
f_out.write(f.read())
return stats