toolkits/pretrain_data_preprocessing/preprocess_data.py (198 lines of code) (raw):
# Copyright (c) 2023 Alibaba PAI Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Processing data for pretraining."""
import argparse
import multiprocessing
import os
import sys
import time
from threading import Semaphore
import torch
import ftfy
import lm_dataformat as lmd
import tqdm
from megatron.core.datasets import indexed_dataset
from megatron_patch.tokenizer import build_tokenizer
sys.path.append(
os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
class Encoder(object):
def __init__(self, args):
self.args = args
def initializer(self):
# Use Encoder class as a container for global data
Encoder.tokenizer = build_tokenizer(self.args)
def encode(self, text):
if self.args.ftfy:
text = ftfy.fix_text(text)
ids = {}
for key in self.args.jsonl_keys:
doc_ids = []
try:
text_ids = Encoder.tokenizer(text, add_special_tokens=False, padding='do_not_pad',max_length=32768,truncation=True)['input_ids']
"""
text_ids = Encoder.tokenizer(text, add_special_tokens=False, padding='max_length',
max_length=2047, truncation=True)['input_ids']
"""
if max(text_ids) >= Encoder.tokenizer.vocab_size:
print(text)
print(max(text_ids))
continue
except Exception as e:
print(f"Error encoding text: {e}") # print error message
continue
if len(text_ids) > 0:
doc_ids.append(text_ids)
if self.args.append_eod:
if hasattr(Encoder.tokenizer, 'eos_token_id'):
doc_ids[-1].append(Encoder.tokenizer.eos_token_id)
elif hasattr(Encoder.tokenizer, 'eod_id'):
doc_ids[-1].append(Encoder.tokenizer.eod_id)
else:
doc_ids[-1].append(Encoder.tokenizer.eod)
#doc_ids[-1].append(Encoder.tokenizer.pad_token_id)
ids[key] = doc_ids
return ids, len(text)
def get_args():
parser = argparse.ArgumentParser()
group = parser.add_argument_group(title='input data')
group.add_argument('--input', type=str, required=True)
group.add_argument(
'--jsonl-keys',
nargs='+',
default=['content'],
help='space separate listed of keys to extract from jsonl. Defa',
)
group.add_argument(
'--num-docs',
default=None,
type=int,
)
group = parser.add_argument_group(title='tokenizer')
group.add_argument(
'--patch-tokenizer-type',
type=str,
required=True,
choices=[
'JiebaBPETokenizer', 'BloomTokenizerFromHF',
'ChatGLMTokenizerFromHF', 'GPT2BPETokenizer',
'GLM10BZHTokenizerFromHF', 'IcetkGLM130BTokenizer',
'LLamaTokenizer', 'FalconTokenizer', 'OPTTokenizer',
'StarcoderTokenizerFromHF', 'QwenTokenizer','Qwen2Tokenizer', 'MistralTokenizer'
],
help='What type of tokenizer to use.',
)
group.add_argument('--vocab-file',
type=str,
default=None,
help='Path to the vocab file')
group.add_argument(
'--merge-file',
type=str,
default=None,
help='Path to the BPE merge file (if necessary).',
)
group.add_argument(
'--append-eod',
action='store_true',
help='Append an <eod> token to the end of a document.',
)
group.add_argument('--ftfy',
action='store_true',
help='Use ftfy to clean text')
group = parser.add_argument_group(title='output data')
group.add_argument(
'--output-prefix',
type=str,
required=True,
help='Path to binary output file without suffix',
)
group.add_argument(
'--dataset-impl',
type=str,
default='mmap',
choices=['lazy', 'cached', 'mmap'],
help='Dataset implementation to use. Default: mmap',
)
group = parser.add_argument_group(title='runtime')
group.add_argument('--workers',
type=int,
default=1,
help='Number of worker processes to launch')
group.add_argument(
'--log-interval',
type=int,
default=100,
help='Interval between progress updates',
)
group.add_argument('--load',
type=str,
default=None,
help='path to tokenizer config file')
group.add_argument('--seq-length',
type=int,
default=2048,
help='sequence length')
group.add_argument('--extra-vocab-size',
type=int,
default=1,
help='extra_vocab_size')
args = parser.parse_args()
args.keep_empty = False
# some default/dummy values for the tokenizer
args.rank = 0
args.make_vocab_size_divisible_by = 128
args.model_parallel_size = 1
return args
def yield_from_files(fnames: list, semaphore):
def yielder(fname, semaphore):
for f in filter(lambda x: x, lmd.Reader(fname).stream_data()):
semaphore.acquire()
yield f
for fname in fnames:
semaphore.acquire()
yield from yielder(fname, semaphore)
def main():
args = get_args()
args.tensor_model_parallel_size = 1
args.rank = 0
args.make_vocab_size_divisible_by = 128
args.vocab_extra_ids = 0
encoder = Encoder(args)
tokenizer = build_tokenizer(args)
print(f'Vocab size: {tokenizer.vocab_size}')
print(f'Output prefix: {args.output_prefix}')
semaphore = Semaphore(10000 + args.workers)
# use multiprocessing to iterate over input documents
file_list = os.listdir(args.input)
path_list = [os.path.join(args.input, file) for file in file_list]
fin = yield_from_files(path_list, semaphore)
if args.workers > 1:
pool = multiprocessing.Pool(args.workers,
initializer=encoder.initializer)
encoded_docs = pool.imap(encoder.encode, fin, chunksize=25)
else:
encoder.initializer()
encoded_docs = (encoder.encode(doc) for doc in fin)
output_bin_files = {}
output_idx_files = {}
builders = {}
for key in args.jsonl_keys:
output_bin_files[key] = '{}_{}_{}.bin'.format(args.output_prefix, key,
'document')
output_idx_files[key] = '{}_{}_{}.idx'.format(args.output_prefix, key,
'document')
builders[key] = indexed_dataset.IndexedDatasetBuilder(
output_bin_files[key],
dtype=indexed_dataset.DType.optimal_dtype(tokenizer.vocab_size),
)
# actually do tokenization
proc_start = time.time()
total_bytes_processed = 0
pbar = tqdm.tqdm()
for i, (doc, bytes_processed) in enumerate(encoded_docs, start=1):
total_bytes_processed += bytes_processed
semaphore.release()
# add each tokenized document / sentence
for key, sentences in doc.items():
for sentence in sentences:
builders[key].add_item(torch.IntTensor(sentence))
# separate with eos token
builders[key].end_document()
# log progress
if i % args.log_interval == 0:
current = time.time()
elapsed = current - proc_start
mbs = total_bytes_processed / elapsed / 1024 / 1024
pbar.set_description(f'Processed {i} documents '
f' ({i / elapsed} docs/s, {mbs} MB/s).')
if i != 0:
pbar.update(args.log_interval)
# save output file
for key in args.jsonl_keys:
builders[key].finalize(output_idx_files[key])
if __name__ == '__main__':
main()