in ultravox/tools/mds_tool.py [0:0]
def convert(self, path: str) -> None:
data_dir = os.path.join(self._args.output_dir, path)
# Clean out any previous conversion.
if os.path.exists(data_dir): # and self._force_deletion:
shutil.rmtree(data_dir)
# Download the dataset in parallel and write via a single writer.
columns = self._map_columns(self._dataset.features)
tasks = self._create_tasks(columns, data_dir, self._args.num_groups)
n = 0
print(
f"Starting conversion, groups={self._args.num_groups}, procs={self._args.num_procs}"
)
with multiprocessing.Pool(
initializer=self._init_worker, processes=self._args.num_procs
) as pool:
for count in pool.imap(self._convert_worker, tasks):
n += count
print("Merging indexes...")
streaming.base.util.merge_index(data_dir, keep_local=True)
print(f"Conversion completed, samples={n}, path={data_dir}")