in src/evaluate/module.py [0:0]
def _get_all_cache_files(self) -> Tuple[List[str], List[FileLock]]:
"""Get a lock on all the cache files in a distributed setup.
We wait for timeout second to let all the distributed node finish their tasks (default is 100 seconds).
"""
if self.num_process == 1:
if self.cache_file_name is None:
raise ValueError(
"Evaluation module cache file doesn't exist. Please make sure that you call `add` or `add_batch` "
"at least once before calling `compute`."
)
file_paths = [self.cache_file_name]
else:
file_paths = [
os.path.join(self.data_dir, f"{self.experiment_id}-{self.num_process}-{process_id}.arrow")
for process_id in range(self.num_process)
]
# Let's acquire a lock on each process files to be sure they are finished writing
filelocks = []
for process_id, file_path in enumerate(file_paths):
if process_id == 0: # process 0 already has its lock file
filelocks.append(self.filelock)
else:
filelock = FileLock(file_path + ".lock")
try:
filelock.acquire(timeout=self.timeout)
except Timeout:
raise ValueError(
f"Cannot acquire lock on cached file {file_path} for process {process_id}."
) from None
else:
filelocks.append(filelock)
return file_paths, filelocks