torchbenchmark/models/fastNLP_Bert/__init__.py (158 lines of code) (raw):

""" fastNLP model (TorchBenchmark Version) This model resembles the "BertEmedding Q&A" task in [fastNLP Tutorial](https://fastnlp.readthedocs.io/zh/latest/tutorials/extend_1_bert_embedding.html). Input data simulates [CMRC2018 dataset](https://ymcui.com/cmrc2018/). The program runs only for benchmark purposes and doesn't provide correctness results. """ import logging from typing import Tuple import torch import random import inspect import numpy as np from fastNLP.embeddings import BertEmbedding from fastNLP.models import BertForQuestionAnswering from fastNLP.core.callback import CallbackManager from fastNLP.core.batch import DataSetIter from fastNLP.core.losses import CMRC2018Loss from fastNLP.core.metrics import CMRC2018Metric from fastNLP.io.pipe.qa import CMRC2018BertPipe from fastNLP import WarmupCallback, GradientClipCallback from fastNLP.core.optimizer import AdamW from fastNLP.core import logger # Import CMRC2018 data generator from .cmrc2018_simulator import generate_inputs from .cmrc2018_simulator import CMRC2018_DIR, CMRC2018_CONFIG_DIR # TorchBench imports from torchbenchmark.util.model import BenchmarkModel from torchbenchmark.tasks import NLP torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False logger.setLevel(logging.WARNING) class Model(BenchmarkModel): task = NLP.OTHER_NLP # Use the train batch size from the original CMRC2018 Q&A task # Source: https://fastnlp.readthedocs.io/zh/latest/tutorials/extend_1_bert_embedding.html DEFAULT_TRAIN_BSIZE = 6 DEFAULT_EVAL_BSIZE = 1 def __init__(self, test, device, jit=False, batch_size=None, extra_args=[]): super().__init__(test=test, device=device, jit=jit, batch_size=batch_size, extra_args=extra_args) self.input_dir = CMRC2018_DIR # Generate input data files # FastNLP loader requires both train and eval files, so we need to generate both of them if test == "train": generate_inputs(train_batch_size=self.batch_size, eval_batch_size=self.DEFAULT_EVAL_BSIZE) elif test == "eval": generate_inputs(train_batch_size=self.DEFAULT_TRAIN_BSIZE, eval_batch_size=self.batch_size) data_bundle = CMRC2018BertPipe().process_from_file(paths=self.input_dir) data_bundle.rename_field('chars', 'words') self.embed = BertEmbedding(data_bundle.get_vocab('words'), model_dir_or_name=CMRC2018_CONFIG_DIR, requires_grad=True, include_cls_sep=False, auto_truncate=True, dropout=0.5, word_dropout=0.01) self.model = self._move_model_to_device(BertForQuestionAnswering(self.embed), device=device) if self._model_contains_inner_module(self.model): self._forward_func = self.model.module.forward else: self._forward_func = self.model.forward # Do not spawn new processes on small scale of data self.num_workers = 0 if self.test == "train": self.model.train() self.trainer = self.model self.train_data = data_bundle.get_dataset('train') self.data = self.train_data self.losser = CMRC2018Loss() self.metrics = CMRC2018Metric() self.update_every = 10 wm_callback = WarmupCallback(schedule='linear') gc_callback = GradientClipCallback(clip_value=1, clip_type='norm') callbacks = [wm_callback, gc_callback] self.optimizer = AdamW(self.model.parameters(), lr=5e-5) self.callback_manager = CallbackManager(env={"trainer":self}, callbacks=callbacks) elif self.test == "eval": self.model.eval() self.data = data_bundle.get_dataset('dev') self.example_inputs = DataSetIter(dataset=self.data, batch_size=self.batch_size, sampler=None, num_workers=self.num_workers, drop_last=False) def get_module(self): batch_x, batch_y = list(self.example_inputs)[0] self._move_dict_value_to_device(batch_x, batch_y, device=self.device) return self.model, (batch_x["words"], ) # Sliced version of fastNLP.Tester._test() def eval(self, niter=1) -> Tuple[torch.Tensor]: self._mode(self.model, is_test=True) self._predict_func = self.model.forward with torch.no_grad(): for epoch in range(niter): for batch_x, batch_y in self.example_inputs: self._move_dict_value_to_device(batch_x, batch_y, device=self.device) pred_dict = self._data_forward(self._predict_func, batch_x) # return a tuple of Tensors return (pred_dict['pred_start'], pred_dict['pred_end'] ) # Sliced version of fastNLP.Trainer._train() def train(self, niter=1): self.step = 0 self.n_epochs = niter self._mode(self.model, is_test=False) self.callback_manager.on_train_begin() # Move the data to GPU before the train loop for batch_x, batch_y in self.example_inputs: self._move_dict_value_to_device(batch_x, batch_y, device=self.device) for epoch in range(niter): self.callback_manager.on_epoch_begin() for batch_x, batch_y in self.example_inputs: self._move_dict_value_to_device(batch_x, batch_y, device=self.device) self.step += 1 prediction = self._data_forward(self.model, batch_x) self.callback_manager.on_loss_begin(batch_y, prediction) loss = self._compute_loss(prediction, batch_y).mean() self.callback_manager.on_backward_begin(loss) self._grad_backward(loss) self.callback_manager.on_backward_end() self._update() self.callback_manager.on_step_end() self.callback_manager.on_batch_end() self.callback_manager.on_epoch_end() self.callback_manager.on_train_end() # Helper functions def _build_args(self, func, **kwargs): spect = inspect.getfullargspec(func) if spect.varkw is not None: return kwargs needed_args = set(spect.args) defaults = [] if spect.defaults is not None: defaults = [arg for arg in spect.defaults] start_idx = len(spect.args) - len(defaults) output = {name: default for name, default in zip(spect.args[start_idx:], defaults)} output.update({name: val for name, val in kwargs.items() if name in needed_args}) return output def _move_dict_value_to_device(self, *args, device, non_blocking=False): if not torch.cuda.is_available() or device is None: return for arg in args: if isinstance(arg, dict): for key, value in arg.items(): if isinstance(value, torch.Tensor): arg[key] = value.to(device, non_blocking=non_blocking) else: raise TypeError("Only support `dict` type right now.") def _model_contains_inner_module(self, model): if isinstance(model, torch.nn.Module): if isinstance(model, (torch.nn.DataParallel, torch.nn.parallel.DistributedDataParallel)): return True return False def _move_model_to_device(self, model, device): model = model.to(device) return model def _mode(self, model, is_test=False): r"""Train mode or Test mode. This is for PyTorch currently. :param model: a PyTorch model :param bool is_test: whether in test mode or not. """ if is_test: model.eval() else: model.train() def _update(self): r"""Perform weight update on a model. """ if self.step % self.update_every == 0: self.optimizer.step() def _data_forward(self, network, x): x = self._build_args(self._forward_func, **x) y = network(**x) if not isinstance(y, dict): raise TypeError( f"The return value of {_get_func_signature(self._forward_func)} should be dict, got {type(y)}.") return y def _grad_backward(self, loss): r"""Compute gradient with link rules. :param loss: a scalar where back-prop starts For PyTorch, just do "loss.backward()" """ if (self.step-1) % self.update_every == 0: self.model.zero_grad() loss.backward() def _compute_loss(self, predict, truth): r"""Compute loss given prediction and ground truth. :param predict: prediction dict, produced by model.forward :param truth: ground truth dict, produced by batch_y :return: a scalar """ return self.losser(predict, truth)