in src/datatrove/pipeline/tokens/merger.py [0:0]
def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
"""Main method to run the merging of files.
The world_size must be 1 for this pipeline step merging the results of the previous parallel step.
Args:
data: DocumentsPipeline
The data to be processed as a Generator typically created by a Reader initial pipeline step
rank: int
The rank of the process
world_size: int
The total number of processes
"""
assert world_size == 1, "world_size must be 1 for DocumentTokenizerMerger"
datafiles = self.input_folder.list_files(glob_pattern="*.ds")
datafiles_index = self.input_folder.list_files(glob_pattern="*.ds.index")
datafiles_loss = (
self.input_folder.list_files(glob_pattern="*.ds.loss")
if self.save_loss_metadata
else ([None] * len(datafiles))
)
assert len(datafiles) == len(datafiles_index) == len(datafiles_loss), (
f"Mismatch between number of .ds, "
".ds.index and/or .ds.loss files"
f"({len(datafiles)} vs {len(datafiles_index)} vs {len(datafiles_loss)})"
)
tokenizer_name_or_path, token_size = None, 2
if self.save_final_metadata:
if self.input_folder.isfile(f"{datafiles[0]}.metadata"):
with self.input_folder.open(f"{datafiles[0]}.metadata", "rt") as f:
tokenizer_name_or_path = f.read().splitlines()[0]
if "|" in tokenizer_name_or_path:
tokenizer_name_or_path, token_size = tokenizer_name_or_path.split("|")
token_size = int(token_size)
# we handle doc ends differently if we are just shuffling documents or shuffling chunks with mixes of documents
doc_ends = [load_doc_ends(self.input_folder.open(file, "rb")) for file in datafiles_index]
boundaries_to_load = doc_ends
windows_with_doc_ends = doc_ends
doc_ends_to_write = [iter([]) for _ in range(len(doc_ends))]
if self.shuffle_chunk_size:
boundaries_to_load = [
[(i + 1) * self.shuffle_chunk_size for i in range(file_doc_ends[-1] // self.shuffle_chunk_size)]
if len(file_doc_ends) > 0
else []
for file_doc_ends in doc_ends
]
windows_with_doc_ends = [
chunk_doc_ends(file_doc_ends, self.shuffle_chunk_size) for file_doc_ends in doc_ends
]
doc_ends_to_write = [
(
[b - windowi * self.shuffle_chunk_size for b in window]
for windowi, window in enumerate(file_windows)
)
for file_windows in windows_with_doc_ends
]
# dataloaders
token_inputs = list(
map(
partial(get_data_reader, nb_bytes=token_size),
self.input_folder.open_files(datafiles),
boundaries_to_load,
)
)
loss_inputs = (
list(
map(
partial(get_data_reader, nb_bytes=1),
self.input_folder.open_files(datafiles_loss),
boundaries_to_load,
)
)
if self.save_loss_metadata
else None
)
ordering = self.get_ordering(windows_with_doc_ends)
file_ct = 0
output_file = TokenizedFile(
output_folder=self.output_folder,
filename=f"{file_ct:03d}_{self.save_filename}.ds",
save_loss_metadata=self.save_loss_metadata,
upload_block_size=self.upload_block_size,
tokenizer_name_or_path=tokenizer_name_or_path,
save_final_metadata=self.save_final_metadata,
token_size=token_size,
)
for input_file_id in tqdm(
ordering, desc="Merging documents", unit="documents", total=len(ordering), disable=not self.progress
):
if 0 < self.max_tokens <= self.stats["tokens"].total:
break
if 0 < self.max_tokens_per_file <= len(output_file):
output_file.close()
file_ct += 1
output_file = TokenizedFile(
output_folder=self.output_folder,
filename=f"{file_ct:03d}_{self.save_filename}.ds",
save_loss_metadata=self.save_loss_metadata,
upload_block_size=self.upload_block_size,
tokenizer_name_or_path=tokenizer_name_or_path,
save_final_metadata=self.save_final_metadata,
token_size=token_size,
)
# copy tokens and loss
tokens = next(token_inputs[input_file_id])
output_file.write_bytes(tokens, next(doc_ends_to_write[input_file_id], None))
if loss_inputs:
output_file.write_loss_bytes(next(loss_inputs[input_file_id]))
self.stat_update("tokens", value=len(tokens) // token_size)
# cleanup
output_file.close()
if self.save_final_metadata:
# save final total metadata file
output_file.write_final_metadata(self.stats["tokens"].total, filename=f"{self.save_filename}.ds")