in src/datatrove/pipeline/tokens/tokenizer.py [0:0]
def write_unshuffled(self, data: DocumentsPipeline, filename: str):
"""Tokenize documents with the tokenizer in batches and write the unshuffled tokenized documents to a file.
We also compute loss values if needed and save them.
Args:
data (DocumentsPipeline): the documents to process
filename (str): the filename to use for the output file
"""
from tokenizers import Encoding
unshuff = TokenizedFile(
self.output_folder
if (not self.shuffle_documents and not self.shuffle_chunk_size) or not self.local_working_dir
else self.local_working_dir,
filename,
save_index=self.save_index or self.shuffle_documents or self.shuffle_chunk_size,
save_loss_metadata=self.save_loss_metadata,
upload_block_size=self.upload_block_size,
tokenizer_name_or_path=self.tokenizer_name_or_path,
save_final_metadata=self.save_final_metadata,
token_size=self.token_size,
)
# tokenize document's text in batches to go faster – we compute loss values independently if needed
for batch in batched(data, self.batch_size):
with self.track_time(unit="batch"):
encoded_batch: list[Encoding] = self.tokenizer.encode_batch([document.text for document in batch])
for document, encoded in zip(batch, encoded_batch):
tokens = encoded.ids
loss_values = self.get_loss_values(document, encoded)
if loss_values is not None and len(loss_values) < len(tokens):
# crop final section without loss
tokens = tokens[: len(loss_values)]
# write bytes to disk
unshuff.write(tokens, loss_values)
# save stats
self.stat_update("tokens", value=len(tokens))
unshuff.close()
return unshuff