review_sentiment_flow.py (268 lines of code) (raw):

# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at https://mozilla.org/MPL/2.0/. import os from metaflow import ( FlowSpec, IncludeFile, Parameter, card, current, step, environment, kubernetes, pypi, nvidia, retry, ) GCS_PROJECT_NAME = "moz-fx-mlops-inference-nonprod" GCS_BUCKET_NAME = "mf-models-test1" MODEL_STORAGE_PATH = "ctroy-example-flow/model-bytes.pth" TOKENIZER_STORAGE_PATH = "models/tokenizer/" class ReviewSentimentFlow(FlowSpec): """ A sample flow demonstrating The use of custom docker images and GPU facilities to train a biggish machine learning model (i.e. not a toy example on the iris dataset). """ # This is an example of a parameter. You can toggle this when you call the flow # with python template_flow.py run --offline False offline_wandb = Parameter( "offline", help="Do not connect to W&B servers when training", type=bool, default=True, ) # You can import the contents of files from your file system to use in flows. # This is meant for small files—in this example, a bit of config. example_config = IncludeFile("example_config", default="./example_config.json") @card(type="default") @kubernetes @step def start(self): """ Each flow has a 'start' step. You can use it for collecting/preprocessing data or other setup tasks. """ self.next(self.train_model) @card @environment( vars={ "WANDB_API_KEY": os.getenv("WANDB_API_KEY"), "WANDB_PROJECT": os.getenv("WANDB_PROJECT"), } ) @pypi(python='3.10.8', packages={ 'torch': '2.4.1', 'wandb': '0.17.8', 'datasets': '3.0.0', 'numpy': '1.26.4', 'tqdm': '4.66.5', 'transformers': '4.44.2', 'mozmlops': '0.1.4', }) @nvidia @retry @step def train_model(self): """ Trains a transformer model on movie reviews using NVIDIA GPUs """ import json import wandb import collections import datasets import numpy as np import torch import torch.nn as nn import torch.optim as optim import tqdm import transformers import json from io import BytesIO from review_sentiment_model import ReviewSentimentModel from os.path import isfile, join config_as_dict = json.loads(self.example_config) print(f"The config file says: {config_as_dict.get('example_key')}") if not self.offline_wandb: tracking_run = wandb.init(project=os.getenv("WANDB_PROJECT")) wandb_url = tracking_run.get_url() current.card.append(Markdown("# Weights & Biases")) current.card.append( Markdown(f"Your training run is tracked [here]({wandb_url}).") ) print("All set. Running training.") seed = 1234 np.random.seed(seed) torch.manual_seed(seed) torch.cuda.manual_seed(seed) torch.backends.cudnn.deterministic = True train_data, test_data = datasets.load_dataset("imdb", split=["train", "test"]) transformer_name = "bert-base-uncased" tokenizer = transformers.AutoTokenizer.from_pretrained(transformer_name) print(tokenizer.tokenize("hello world!")) print(tokenizer.encode("hello world!")) print(tokenizer.convert_ids_to_tokens(tokenizer.encode("hello world"))) print(tokenizer("hello world!")) def tokenize_and_numericalize_example(example, tokenizer): ids = tokenizer(example["text"], truncation=True)["input_ids"] return {"ids": ids} train_data = train_data.map( tokenize_and_numericalize_example, fn_kwargs={"tokenizer": tokenizer} ) test_data = test_data.map( tokenize_and_numericalize_example, fn_kwargs={"tokenizer": tokenizer} ) print(train_data[0]) test_size = 0.25 pad_index = tokenizer.pad_token_id train_valid_data = train_data.train_test_split(test_size=test_size) train_data = train_valid_data["train"] valid_data = train_valid_data["test"] train_data = train_data.with_format(type="torch", columns=["ids", "label"]) valid_data = valid_data.with_format(type="torch", columns=["ids", "label"]) test_data = test_data.with_format(type="torch", columns=["ids", "label"]) def get_collate_fn(pad_index): def collate_fn(batch): batch_ids = [i["ids"] for i in batch] batch_ids = nn.utils.rnn.pad_sequence( batch_ids, padding_value=pad_index, batch_first=True ) batch_label = [i["label"] for i in batch] batch_label = torch.stack(batch_label) batch = {"ids": batch_ids, "label": batch_label} return batch return collate_fn def get_data_loader(dataset, batch_size, pad_index, shuffle=False): collate_fn = get_collate_fn(pad_index) data_loader = torch.utils.data.DataLoader( dataset=dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=shuffle, ) return data_loader batch_size = 8 train_data_loader = get_data_loader(train_data, batch_size, pad_index, shuffle=True) valid_data_loader = get_data_loader(valid_data, batch_size, pad_index) test_data_loader = get_data_loader(test_data, batch_size, pad_index) transformer = transformers.AutoModel.from_pretrained(transformer_name) self.output_dim = len(train_data["label"].unique()) freeze = False model = ReviewSentimentModel(transformer, self.output_dim, freeze) lr = 1e-5 optimizer = optim.Adam(model.parameters(), lr=lr) criterion = nn.CrossEntropyLoss() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = model.to(device) criterion = criterion.to(device) def train(data_loader, model, criterion, optimizer, device): model.train() epoch_losses = [] epoch_accs = [] for batch in tqdm.tqdm(data_loader, desc="training..."): ids = batch["ids"].to(device) label = batch["label"].to(device) prediction = model(ids) loss = criterion(prediction, label) accuracy = get_accuracy(prediction, label) optimizer.zero_grad() loss.backward() optimizer.step() epoch_losses.append(loss.item()) epoch_accs.append(accuracy.item()) return np.mean(epoch_losses), np.mean(epoch_accs) def evaluate(data_loader, model, criterion, device): model.eval() epoch_losses = [] epoch_accs = [] with torch.no_grad(): for batch in tqdm.tqdm(data_loader, desc="evaluating..."): ids = batch["ids"].to(device) label = batch["label"].to(device) prediction = model(ids) loss = criterion(prediction, label) accuracy = get_accuracy(prediction, label) epoch_losses.append(loss.item()) epoch_accs.append(accuracy.item()) return np.mean(epoch_losses), np.mean(epoch_accs) def get_accuracy(prediction, label): batch_size, _ = prediction.shape predicted_classes = prediction.argmax(dim=-1) correct_predictions = predicted_classes.eq(label).sum() accuracy = correct_predictions / batch_size return accuracy n_epochs = 3 best_valid_loss = float("inf") metrics = collections.defaultdict(list) for epoch in range(n_epochs): train_loss, train_acc = train( train_data_loader, model, criterion, optimizer, device ) valid_loss, valid_acc = evaluate(valid_data_loader, model, criterion, device) metrics["train_losses"].append(train_loss) metrics["train_accs"].append(train_acc) metrics["valid_losses"].append(valid_loss) metrics["valid_accs"].append(valid_acc) if valid_loss < best_valid_loss: best_valid_loss = valid_loss torch.save(model.state_dict(), "transformer.pt") print(f"epoch: {epoch}") print(f"train_loss: {train_loss:.3f}, train_acc: {train_acc:.3f}") print(f"valid_loss: {valid_loss:.3f}, valid_acc: {valid_acc:.3f}") model.load_state_dict(torch.load("transformer.pt")) test_loss, test_acc = evaluate(test_data_loader, model, criterion, device) print(f"test_loss: {test_loss:.3f}, test_acc: {test_acc:.3f}") self.device = device buffer = BytesIO() torch.save(model.state_dict(), buffer) self.model_state_dict_bytes = buffer.getvalue() self.tokenizer_as_dict = {} tokenizer.save_pretrained("tok") with open('tok/tokenizer.json') as file: self.tokenizer_as_dict = json.load(file) self.next(self.error_analysis) @pypi(python='3.10.8', packages={ 'torch': '2.4.1', 'wandb': '0.17.8', 'tokenizers' : '0.20.0', }) @kubernetes @step def error_analysis(self): """ Predict the sentiment of some sample movie reviews and see, on an individual level, how they look """ import torch from tokenizers import Tokenizer from review_sentiment_model import ReviewSentimentModel from io import BytesIO import json device = self.device with open('tokenizer.json', 'w') as fp: json.dump(self.tokenizer_as_dict, fp) tokenizer = Tokenizer.from_file("tokenizer.json") model = ReviewSentimentModel(tokenizer, self.output_dim, False) buffer = BytesIO(self.model_state_dict_bytes) model.load_state_dict(torch.load(buffer, map_location=device, weights_only=True)) def predict_sentiment(text, model, tokenizer, device): ids = tokenizer(text)["input_ids"] tensor = torch.LongTensor(ids).unsqueeze(dim=0).to(device) prediction = model(tensor).squeeze(dim=0) probability = torch.softmax(prediction, dim=-1) predicted_class = prediction.argmax(dim=-1).item() predicted_probability = probability[predicted_class].item() return predicted_class, predicted_probability print("(Clearly these are toy examples; one could load a batch of examples here for more rigorous error analysis)") text = "This film is terrible!" print(f"Analysis of text: {text}") print(predict_sentiment(text, model, tokenizer, device)) text = "This film is not terrible, it's great!" print(f"Analysis of text: {text}") print(predict_sentiment(text, model, tokenizer, device)) text = "This film is not terrible, it's great!" print(f"Analysis of text: {text}") print(predict_sentiment(text, model, tokenizer, device)) self.next(self.upload_model_to_gcs) @pypi(python='3.10.8', packages={ 'mozmlops': '0.1.4' }) @kubernetes @step def upload_model_to_gcs(self): from mozmlops.cloud_storage_api_client import CloudStorageAPIClient print(f"Uploading model to gcs") # init client storage_client = CloudStorageAPIClient( project_name=GCS_PROJECT_NAME, bucket_name=GCS_BUCKET_NAME ) storage_client.store(data=self.model_state_dict_bytes, storage_path=MODEL_STORAGE_PATH) self.next(self.end) @kubernetes @step def end(self): """ This is the mandatory 'end' step: it prints some helpful information to access the model and the used dataset. """ print( f""" Flow complete. """ ) if __name__ == "__main__": ReviewSentimentFlow()