megatron_patch/tokenizer/icetk_glm130b_tokenizer.py (273 lines of code) (raw):

# Copyright (c) 2023 Alibaba PAI Team. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import os from abc import ABC, abstractmethod from typing import List, Union import icetk.sentencepiece_model_pb2 as sp_model from icetk.text_tokenizer import TextTokenizer from icetk.utils import auto_create logger = logging.getLogger(__name__) class AbstractTokenizer(ABC): """Abstract class for tokenizer.""" def __init__(self, name): self.name = name super().__init__() @property @abstractmethod def vocab_size(self): pass @property @abstractmethod def vocab(self): """Dictionary from vocab text token to id token.""" pass @property @abstractmethod def inv_vocab(self): """Dictionary from vocab id token to text token.""" pass @abstractmethod def tokenize(self, text): pass def detokenize(self, token_ids): raise NotImplementedError('detokenizer is not implemented for {} ' 'tokenizer'.format(self.name)) @property def cls(self): raise NotImplementedError('CLS is not provided for {} ' 'tokenizer'.format(self.name)) @property def sep(self): raise NotImplementedError('SEP is not provided for {} ' 'tokenizer'.format(self.name)) @property def pad(self): raise NotImplementedError('PAD is not provided for {} ' 'tokenizer'.format(self.name)) @property def eod(self): raise NotImplementedError('EOD is not provided for {} ' 'tokenizer'.format(self.name)) @property def mask(self): raise NotImplementedError('MASK is not provided for {} ' 'tokenizer'.format(self.name)) class GLM130BTokenizer: def __init__( self, path='~/.icetk_models', max_blank_length=80, byte_fallback=True, ): if path is not None: self.path = os.path.expanduser(path) self.special_tokens = [ '[MASK]', '[gMASK]', '[sMASK]', '<eod>', '<sop>', '<eop>', '<ENC>', '<dBLOCK>' ] self.max_blank_length = max_blank_length self.byte_fallback = byte_fallback @staticmethod def _configure_tokenizer( text_tokenizer: TextTokenizer, special_tokens: List[str], max_blank_length: int, byte_fallback: bool, encode_special_tokens=False, ): """ Configure the text tokenizer with special tokens, whitespace tokens, and byte fallback tokens. Args: text_tokenizer (TextTokenizer): The text tokenizer to configure. special_tokens (List[str]): The list of special tokens to add to the tokenizer. max_blank_length (int): The maximum length of whitespace tokens to add to the tokenizer. byte_fallback (bool): Whether to add byte fallback tokens to the tokenizer. encode_special_tokens (bool): Whether to encode the special tokens as a separate token type. """ # special token special_token_type = 4\ if encode_special_tokens else 3 for token in special_tokens: text_tokenizer.proto.pieces.append( sp_model.ModelProto.SentencePiece(piece=token, score=0.0, type=special_token_type)) # whitespaces for token in [GLM130BTokenizer.get_tab_token()] + [ GLM130BTokenizer.get_blank_token(i) for i in range(2, max_blank_length + 1) ]: text_tokenizer.proto.pieces.append( sp_model.ModelProto.SentencePiece(piece=token, score=0.0, type=4)) # byte fallback if byte_fallback: text_tokenizer.proto.trainer_spec.byte_fallback = True for i in range(256): text_tokenizer.proto.pieces.append( sp_model.ModelProto.SentencePiece( piece='<0x{:02X}>'.format(i), score=0.0, type=6)) # text_tokenizer.refresh() def _get_text_tokenizer(self, encode_special_tokens=False): name = '_special_text_tokenizer' if\ encode_special_tokens else '_text_tokenizer' if not hasattr(self, name): fp = os.path.join(self.path, 'ice_text.model') auto_create(fp) tokenizer = TextTokenizer(fp) self._configure_tokenizer(tokenizer, self.special_tokens, self.max_blank_length, self.byte_fallback, encode_special_tokens) setattr(self, name, tokenizer) return getattr(self, name) @staticmethod def get_blank_token(length: int): assert length >= 2 return '<|blank_{length}|>' @staticmethod def get_tab_token(): return '<|tab|>' @property def text_tokenizer(self): return self._get_text_tokenizer(encode_special_tokens=False) @property def special_text_tokenizer(self): return self._get_text_tokenizer(encode_special_tokens=True) @property def num_image_tokens(self): return 20000 @property def num_text_tokens(self): return (self.text_tokenizer.num_tokens + len(self.special_tokens) + (self.max_blank_length - 2) + (256 if self.byte_fallback else 0)) @property def num_tokens(self): return self.num_image_tokens + self.num_text_tokens @staticmethod def _encode_whitespaces(text: str, max_len: int = 80): text = text.replace('\t', GLM130BTokenizer.get_tab_token()) for i in range(max_len, 1, -1): text = text.replace(' ' * i, GLM130BTokenizer.get_blank_token(i)) return text def _preprocess(self, text: str, linebreak=True, whitespaces=True): if linebreak: text = text.replace('\n', '<n>') if whitespaces: text = self._encode_whitespaces(text, max_len=self.max_blank_length) return text def encode(self, text: str, linebreak=True, whitespaces=True, special_tokens=False, add_dummy_prefix=True) -> List[int]: text = self._preprocess(text, linebreak, whitespaces) if not add_dummy_prefix: text = '<n>' + text tmp = self._get_text_tokenizer( encode_special_tokens=special_tokens).encode(text) tokens = [x + self.num_image_tokens for x in tmp] return tokens if add_dummy_prefix else tokens[2:] def decode(self, text_ids: List[int], special_tokens=False) -> str: ids = [int(_id) - self.num_image_tokens for _id in text_ids] text = self._get_text_tokenizer( encode_special_tokens=special_tokens).decode(ids) text = text.replace('<n>', '\n') text = text.replace(GLM130BTokenizer.get_tab_token(), '\t') for i in range(2, self.max_blank_length + 1): text = text.replace(self.get_blank_token(i), ' ' * i) return text def tokenize(self, text: str, linebreak=True, whitespaces=True, special_tokens=False, add_dummy_prefix=True) -> List[str]: text = self._preprocess(text, linebreak, whitespaces) if not add_dummy_prefix: text = '<n>' + text tokens = self._get_text_tokenizer( encode_special_tokens=special_tokens).tokenize(text) return tokens if add_dummy_prefix else tokens[2:] def __getitem__(self, x: Union[int, str]): if isinstance(x, int): if x < self.num_image_tokens: return '<image_{}>'.format(x) else: return self.text_tokenizer.convert_id_to_token( x - self.num_image_tokens) elif isinstance(x, str): if x.startswith('<image_') and x.endswith( '>') and x[7:-1].isdigit(): return int(x[7:-1]) else: return self.text_tokenizer.convert_token_to_id( x) + self.num_image_tokens else: raise ValueError('The key should be str or int.') class _IceTokenizer(AbstractTokenizer): """Hardcoded tokenizer.""" def __init__(self, max_blank_len=80): name = 'IceTokenizer' super().__init__(name) self.tokenizer = GLM130BTokenizer() self.num_tokens = 150000 self.add_special_tokens([ '[MASK]', '[gMASK]', '[sMASK]', 'eod', 'sop', 'eop', 'ENC', 'dBLOCK' ] + ['<t>'] + [f'<blank_{i}>' for i in range(2, max_blank_len + 1)]) self.sentence_end_decoder = { 20007: '.', 20031: '?', 20035: '!', 20027: ';', 20012: ':', 83823: '。', 145670: '…' } self.special_tokens['eos'] = 20002 self.special_tokens_decoder[20002] = '</s>' def add_special_tokens(self, special_tokens): """Add a list of additional tokens to the encoder. The additional tokens are indexed starting from the last index of the current vocabulary in the order of the `special_tokens` list. """ if not special_tokens: self.special_tokens = {} self.special_tokens_decoder = {} return self.special_tokens = dict( (tok, self.num_tokens + i) for i, tok in enumerate(special_tokens)) self.special_tokens_decoder = { v: k for k, v in self.special_tokens.items() } # for k, v in self.special_tokens.items(): # self.tokenizer.decoder[v] = "\u0120" + k logger.info('Special tokens {}'.format(self.special_tokens)) def get_command(self, token): return self.special_tokens[token] def contains_sentence_end(self, idx): return idx in self.sentence_end_decoder def IdToToken(self, idx): if idx == 0: return '[pad]' elif idx in self.special_tokens_decoder: return f'[{self.special_tokens_decoder[idx]}]' else: return self.tokenizer.decode([idx]) def TokenToId(self, token): if token == '[pad]': return 0 elif token in self.special_tokens: return self.special_tokens[token] else: return self.tokenizer.encode(token)[0] @property def vocab_size(self): return self.tokenizer.num_tokens @property def vocab(self): assert False return self.tokenizer.encoder @property def inv_vocab(self): assert False return self.tokenizer.decoder def tokenize(self, text): return self.tokenizer.encode(text) def detokenize(self, token_ids): split = [-1] for i, token in enumerate(token_ids): if token in self.special_tokens_decoder: split.append(i) split.append(len(token_ids)) text = '' for i in range(len(split) - 1): if i > 0: text += self.IdToToken(token_ids[split[i]]) text += self.tokenizer.decode(token_ids[split[i] + 1:split[i + 1]]) return text @property def eod(self): return self.get_special_token('eod')