notebooks/packed_bert/pipeline/packed_bert.py (342 lines of code) (raw):

import logging import time from typing import Dict, List import numpy as np import torch from datasets import Dataset import poptorch from models.modeling_bert_packed import ( PipelinedPackedBertForQuestionAnswering, PipelinedPackedBertForSequenceClassification, ) from optimum.graphcore import IPUConfig from scipy.special import softmax from transformers import AutoConfig, AutoTokenizer from transformers.data.data_collator import default_data_collator from utils.packing.dataset_creator import PackedDatasetCreator from utils.packing.dataset_templates import PackedQuestionAnsweringDataset from utils.packing.qa_utils import postprocess_packed_qa_predictions, preprocess_packed_qa logger = logging.getLogger("") def get_poplar_executor(model, ipu_config, batch, detach=False): ipu_options = ipu_config.to_options(for_inference=True) model.ipu_config = ipu_config if isinstance(model, poptorch.PoplarExecutor): print("Model already wrapped - nothing to do.") return model try: model.deparallelize() except: pass ipu_model = poptorch.inferenceModel(model.eval().parallelize(), ipu_options) ipu_model.compile(**batch) if detach: ipu_model.detachFromDevice() return ipu_model def prepare_inference_dataloader(ipu_config, dataset, batch_size, mode="async_rebatched"): return poptorch.DataLoader( ipu_config.to_options(for_inference=True), dataset, batch_size=batch_size, shuffle=False, # Must be false, retained order important for batched inference drop_last=False, # Must be false, we pad up to global batch size in inference pipeline to avoid any division error mode=mode, collate_fn=default_data_collator, ) class PackedBertTextClassificationPipeline: """ Packed classification pipeline: Batched inference pipeline for packed BERT text classification with multi/single label. Wraps all preprocessing and model for inference, executes on text inputs in format `questions, contexts` of any size, proceeds to batch according to checkpoint or as per custom IPU configs, and packs data. Performs inference on PipelinedPackedBertForSequenceClassification. Returns postprocessed predictions in same order as input data. """ def __init__( self, model, executable_cache_dir: str = "./exe_cache", problem_type: str = "single_label_classification", max_seq_per_pack: int = 12, max_seq_length: int = 384, ipu_config: IPUConfig = None, micro_batch_size: int = 1, dataloader_mode: str = "async_rebatched", detach_model_after_compile: bool = False, pretrained_tokenizer: str = "bert-base-uncased", label_categories: List = [], ) -> None: self.model_ckpt = model self.problem_type = problem_type self.max_seq_per_pack = max_seq_per_pack self.max_seq_length = max_seq_length self.pretrained_tokenizer = pretrained_tokenizer self.dataloader_mode = dataloader_mode self.detach_model_after_post_compile = detach_model_after_compile self.executable_cache_dir = executable_cache_dir self.micro_batch_size = micro_batch_size self.sentence_2_key = None self.label_categories = label_categories if not ipu_config: try: logger.info("Attempting loading IPUConfig from model checkpoint:") self.ipu_config = IPUConfig.from_pretrained( self.model_ckpt, executable_cache_dir=self.executable_cache_dir ) except: logger.warn( "Loading default config: 'Graphcore/bert-base-uncased' - because no IPUConfig found in model folder." ) self.ipu_config = IPUConfig.from_pretrained( "Graphcore/bert-base-uncased", executable_cache_dir=self.executable_cache_dir ) else: self.ipu_config = ipu_config self.gbs = ( self.ipu_config.inference_device_iterations * self.ipu_config.inference_replication_factor * self.micro_batch_size ) try: logger.info("Attempting loading tokenizer from model checkpoint") self.tokenizer = AutoTokenizer.from_pretrained(self.model_ckpt, use_fast=True) except: logger.warn("Loading tokenizer from defined because no pretrained tokenizer found in model folder.") self.tokenizer = AutoTokenizer.from_pretrained(self.pretrained_tokenizer, use_fast=True) config = AutoConfig.from_pretrained(self.model_ckpt) config.max_sequences_per_pack = self.max_seq_per_pack config.problem_type = self.problem_type self.model = ( PipelinedPackedBertForSequenceClassification(config).from_pretrained(self.model_ckpt, config=config).half() ) compile_data = Dataset.from_dict({"text": ["I am a dummy sentence for compilation."]}) enc_compile_data = compile_data.map(self.preprocess_function, batched=True) pck_compile_data = PackedDatasetCreator( tokenized_dataset=enc_compile_data, max_sequence_length=self.max_seq_length, max_sequences_per_pack=self.max_seq_per_pack, inference=True, pad_to_global_batch_size=True, global_batch_size=self.gbs, problem_type=self.problem_type, ).create() c_dataloader = prepare_inference_dataloader( self.ipu_config, pck_compile_data, self.micro_batch_size, self.dataloader_mode ) c_batch = next(iter(c_dataloader)) # Remove custom column for compile - autoignored in optimum, manually ignored in predict c_batch.pop("example_ids", None) self.poplar_executor = get_poplar_executor(self.model, self.ipu_config, c_batch) def preprocess_function(self, examples): if self.sentence_2_key: return self.tokenizer( examples["text"], examples["text_2"], truncation=True, max_length=self.max_seq_length ) else: return self.tokenizer(examples["text"], truncation=True, max_length=self.max_seq_length) def postprocess_preds(self, logits, ids): ids = torch.concat(ids) mask = ids != -100 ids = ids[mask] if self.problem_type == "multi_label_classification": pred_scores = softmax(torch.concat(logits)[mask, :].numpy().astype("float32"), axis=1) if self.problem_type == "single_label_classification": pred_scores = softmax(torch.concat(logits)[mask, :].numpy().astype("float32"), axis=1) pred_scores = pred_scores[np.argsort(ids)] return pred_scores def predict(self, sentence_1, sentence_2=None): self.sentence_2_key = sentence_2 prep_st = time.time() data_dict = {"text": sentence_1} if sentence_2: data_dict["text_2"] = sentence_2 dataset = Dataset.from_dict(data_dict) enc_data = dataset.map(self.preprocess_function, batched=True) # Pack the inputs packed_data = PackedDatasetCreator( tokenized_dataset=enc_data, max_sequence_length=self.max_seq_length, max_sequences_per_pack=self.max_seq_per_pack, inference=True, pad_to_global_batch_size=True, global_batch_size=self.gbs, problem_type=self.problem_type, ).create() dataloader = prepare_inference_dataloader( self.ipu_config, packed_data, self.micro_batch_size, self.dataloader_mode ) example_ids = [] outputs = [] # Process the model to return logits prep_time = time.time() - prep_st model_st = time.time() for batch in iter(dataloader): logits = self.poplar_executor( input_ids=batch["input_ids"], attention_mask=batch["attention_mask"], token_type_ids=batch["token_type_ids"], position_ids=batch["position_ids"], ) ids = batch["example_ids"] outputs.append(logits.view(ids.shape[0], self.max_seq_per_pack, -1)) example_ids.append(ids) model_en = time.time() model_time = model_en - model_st tput = len(sentence_1) / (model_time) # Postprocess predictions to preserve order post_st = time.time() final_preds = self.postprocess_preds(outputs, example_ids) if len(self.label_categories) == final_preds.shape[-1]: final_preds = {k: dict(list(zip(self.label_categories, v))) for k, v in enumerate(final_preds)} else: final_preds = {{n: k[n] for n in k} for k in final_preds} post_proc_time = time.time() - post_st return { "predictions": final_preds, "throughput": tput, "inference_total_time": model_time, "preprocessing_time": prep_time, "postprocessing_time": post_proc_time, } class PackedBertQuestionAnsweringPipeline: """ Packed Question-answering pipeline: Batched inference pipeline for packed BERT question answering. Wraps all preprocessing and model for inference, executes on text inputs in format `questions, contexts` of any size, proceeds to batch according to checkpoint or as per custom IPU configs, and packs data. Performs inference on PipelinedPackedBertForQuestionAnswering. Returns postprocessed predictions in same order as input data. """ def __init__( self, model, executable_cache_dir: str = "./exe_cache", problem_type: str = "question_answering", max_seq_per_pack: int = 12, max_seq_length: int = 384, pretrained_tokenizer: str = "bert-base-uncased", ipu_config: str = None, micro_batch_size: int = 1, dataloader_mode: str = "async_rebatched", detach_model_after_compile: bool = False, ) -> None: self.problem_type = problem_type self.max_seq_per_pack = max_seq_per_pack self.max_seq_length = max_seq_length self.model_ckpt = model self.pretrained_tokenizer = pretrained_tokenizer self.dataloader_mode = dataloader_mode self.detach_model_after_post_compile = detach_model_after_compile self.executable_cache_dir = executable_cache_dir self.micro_batch_size = micro_batch_size if not ipu_config: try: logger.info("Attempting loading IPUConfig from model checkpoint:") self.ipu_config = IPUConfig.from_pretrained( self.model_ckpt, executable_cache_dir=self.executable_cache_dir ) except: logger.warn( "Loading default config: 'Graphcore/bert-base-uncased' - because no IPUConfig found in model folder." ) self.ipu_config = IPUConfig.from_pretrained( "Graphcore/bert-base-uncased", executable_cache_dir=self.executable_cache_dir ) else: self.ipu_config = ipu_config self.gbs = ( self.ipu_config.inference_device_iterations * self.ipu_config.inference_replication_factor * self.micro_batch_size ) try: self.tokenizer = AutoTokenizer.from_pretrained(self.model_ckpt, use_fast=True) except: self.tokenizer = AutoTokenizer.from_pretrained(self.pretrained_tokenizer, use_fast=True) config = AutoConfig.from_pretrained(self.model_ckpt) config.max_sequences_per_pack = self.max_seq_per_pack config.problem_type = self.problem_type self.model = ( PipelinedPackedBertForQuestionAnswering(config).from_pretrained(self.model_ckpt, config=config).half() ) compile_data = Dataset.from_dict( { "id": np.array([str(i) for i in range(self.gbs)]).astype("<U32"), "question": ["Do trees have leaves in the wintertime?."] * self.gbs, "context": [ "Most trees leaves fall off after the autumn season. However, evergreen trees keep their leaves through winter." ] * self.gbs, } ) enc_compile_data = preprocess_packed_qa( dataset=compile_data, tokenizer=self.tokenizer, question_key="question", context_key="context", answer_key="answer", sequence_length=self.max_seq_length, padding=True, # only for compile, so we dont need to pack the dummy data train=False, ) packed_compile_data_pre = PackedDatasetCreator( tokenized_dataset=enc_compile_data, max_sequence_length=self.max_seq_length, max_sequences_per_pack=self.max_seq_per_pack, inference=True, pad_to_global_batch_size=True, global_batch_size=self.gbs, problem_type=self.problem_type, ).create() packed_compile_data = Dataset.from_list(packed_compile_data_pre) packed_compile_data = packed_compile_data.remove_columns(["offset_mapping", "example_ids"]) c_dataloader = prepare_inference_dataloader( self.ipu_config, packed_compile_data, self.micro_batch_size, self.dataloader_mode ) c_batch = next(iter(c_dataloader)) c_batch.pop("offset_mapping", None) c_batch.pop("example_id", None) self.poplar_executor = get_poplar_executor(self.model, self.ipu_config, c_batch) def predict(self, questions, contexts): prep_st = time.time() dataset = Dataset.from_dict( { "id": np.array([str(i) for i in range(len(questions))]).astype("<U32"), "question": questions, "context": contexts, } ) enc_data = preprocess_packed_qa( dataset=dataset, tokenizer=self.tokenizer, question_key="question", context_key="context", answer_key="answer", sequence_length=self.max_seq_length, padding=False, train=False, ) packed_data_pre = PackedDatasetCreator( tokenized_dataset=enc_data, max_sequence_length=self.max_seq_length, max_sequences_per_pack=self.max_seq_per_pack, inference=True, pad_to_global_batch_size=True, global_batch_size=self.gbs, problem_type=self.problem_type, ).create() # Not the most efficient way... packed_data = Dataset.from_list(packed_data_pre) packed_data = packed_data.remove_columns(["offset_mapping", "example_ids"]) packed_data = PackedQuestionAnsweringDataset( input_ids=packed_data["input_ids"], attention_mask=packed_data["attention_mask"], token_type_ids=packed_data["token_type_ids"], position_ids=packed_data["position_ids"], start_positions=None, end_positions=None, offset_mapping=None, example_ids=None, ) dataloader = prepare_inference_dataloader( self.ipu_config, packed_data, self.micro_batch_size, self.dataloader_mode ) outputs = [] prep_time = time.time() - prep_st model_st = time.time() for batch in iter(dataloader): logits = self.poplar_executor(**batch) outputs.append(torch.stack(logits)) model_en = time.time() model_time = model_en - model_st tput = len(questions) / (model_time) post_st = time.time() outputs = torch.cat(outputs, dim=1).numpy() final_preds = postprocess_packed_qa_predictions(dataset, packed_data_pre, outputs) formatted_predictions = [{"id": k, "prediction_text": v} for k, v in final_preds.items()] post_proc_time = time.time() - post_st return { "predictions": formatted_predictions, "throughput": tput, "inference_total_time": model_time, "preprocessing_time": prep_time, "postprocessing_time": post_proc_time, }