in src/evaluate/module.py [0:0]
def _init_writer(self, timeout=1):
if self.num_process > 1:
if self.process_id == 0:
file_path = os.path.join(self.data_dir, f"{self.experiment_id}-{self.num_process}-rdv.lock")
self.rendez_vous_lock = FileLock(file_path)
try:
self.rendez_vous_lock.acquire(timeout=timeout)
except TimeoutError:
raise ValueError(
f"Error in _init_writer: another evalution module instance is already using the local cache file at {file_path}. "
f"Please specify an experiment_id (currently: {self.experiment_id}) to avoid collision "
f"between distributed evaluation module instances."
) from None
if self.keep_in_memory:
self.buf_writer = pa.BufferOutputStream()
self.writer = ArrowWriter(
features=self.selected_feature_format, stream=self.buf_writer, writer_batch_size=self.writer_batch_size
)
else:
self.buf_writer = None
# Get cache file name and lock it
if self.cache_file_name is None or self.filelock is None:
cache_file_name, filelock = self._create_cache_file() # get ready
self.cache_file_name = cache_file_name
self.filelock = filelock
self.writer = ArrowWriter(
features=self.selected_feature_format,
path=self.cache_file_name,
writer_batch_size=self.writer_batch_size,
)
# Setup rendez-vous here if
if self.num_process > 1:
if self.process_id == 0:
self._check_all_processes_locks() # wait for everyone to be ready
self.rendez_vous_lock.release() # let everyone go
else:
self._check_rendez_vous() # wait for master to be ready and to let everyone go