in src/evaluate/module.py [0:0]
def _finalize(self):
"""Close all the writing process and load/gather the data
from all the nodes if main node or all_process is True.
"""
if self.writer is not None:
self.writer.finalize()
self.writer = None
# release the locks of the processes > 0 so that process 0 can lock them to read + delete the data
if self.filelock is not None and self.process_id > 0:
self.filelock.release()
if self.keep_in_memory:
# Read the predictions and references
reader = ArrowReader(path=self.data_dir, info=DatasetInfo(features=self.selected_feature_format))
self.data = Dataset.from_buffer(self.buf_writer.getvalue())
elif self.process_id == 0:
# Let's acquire a lock on each node files to be sure they are finished writing
file_paths, filelocks = self._get_all_cache_files()
# Read the predictions and references
try:
reader = ArrowReader(path="", info=DatasetInfo(features=self.selected_feature_format))
self.data = Dataset(**reader.read_files([{"filename": f} for f in file_paths]))
except FileNotFoundError:
raise ValueError(
"Error in finalize: another evaluation module instance is already using the local cache file. "
"Please specify an experiment_id to avoid collision between distributed evaluation module instances."
) from None
# Store file paths and locks and we will release/delete them after the computation.
self.file_paths = file_paths
self.filelocks = filelocks