in src/mlm/cmds.py [0:0]
def cmd_score(args: argparse.Namespace) -> None:
"""mlm score command
"""
# Get model
ctxs = setup_ctxs(args.gpus)
weights_file = Path(args.weights) if isinstance(args.weights, str) else None
# Redirect console output from GluonNLP downloading models
with _stdout_to_stderr():
model, vocab, tokenizer = get_pretrained(ctxs, args.model, weights_file, regression=args.no_mask)
# Set scorer
if MLMScorer._check_support(model):
# GluonNLP
scorer = MLMScorer(model, vocab, tokenizer, eos=args.eos, wwm=args.whole_word_mask, capitalize=args.capitalize, ctxs=ctxs)
elif MLMScorerPT._check_support(model):
# Transformers
scorer = MLMScorerPT(model, vocab, tokenizer, eos=args.eos, wwm=args.whole_word_mask, capitalize=args.capitalize, ctxs=ctxs, lang=args.tgt)
elif RegressionScorer._check_support(model):
scorer = RegressionScorer(model, vocab, tokenizer, eos=args.eos, wwm=args.whole_word_mask, capitalize=args.capitalize, ctxs=ctxs)
elif LMScorer._check_support(model):
assert not args.whole_word_mask
assert not args.no_mask
scorer = LMScorer(model, vocab, tokenizer, eos=args.eos, capitalize=args.capitalize, ctxs=ctxs)
else:
raise ValueError(f"Model '{model.__class__.__name__}' not supported by any scorer.")
# What data do we use?
if args.mode == 'hyp':
preds = Predictions.from_file(args.infile, max_utts=args.max_utts)
# We 'deserialize' the predictions into a corpus, for better batching
corpus = preds.to_corpus()
logging.warn("# of input sequences: {}".format(len(preds)))
logging.warn("# of hypotheses: {}".format(len(corpus)))
elif args.mode == 'ref':
corpus = Corpus.from_file(args.infile, max_utts=args.max_utts)
logging.warn("# sentences: {}".format(len(corpus)))
# === START SHARED COMPUTATION ===
# A scorer takes a corpus and produces a list of scores in order of the corpus
if args.detok:
corpus_for_scoring = corpus.detok_copy()
else:
corpus_for_scoring = corpus
scores, true_tok_lens = scorer.score(corpus_for_scoring, ratio=1, split_size=args.split_size, per_token=args.per_token)
scored_corpus = ScoredCorpus.from_corpus_and_scores(corpus, scores)
num_words_list, max_sent_len = corpus.get_num_words()
if args.eos:
logging.warn("Adding EOSes '.' to (P)PPL computation")
num_words_list = [x+1 for x in num_words_list]
num_words_total = sum(num_words_list)
if args.eos:
logging.warn("# words (excluding EOS '.'): {}".format(num_words_total))
else:
logging.warn("# words: {}".format(num_words_total))
logging.warn("longest sentence: {}".format(max_sent_len))
num_toks_total = sum(true_tok_lens)
if args.eos:
logging.warn("# tokens (including EOS '.'): {}".format(num_toks_total))
else:
logging.warn("# tokens: {}".format(num_toks_total))
if not args.per_token:
plls = np.array(scores)
pppl_tok_micro = np.exp(- plls.sum() / num_toks_total).item()
logging.warn("Token-level (P)PPL: {}".format(pppl_tok_micro))
# pppl_tok_macro = np.exp(- (plls / np.array(true_tok_lens)).mean())
# logging.warn("Token-level (P)PPL, macro: {}".format(pppl_tok_macro))
pppl_word_micro = math.exp((num_toks_total / num_words_total) * math.log(pppl_tok_micro))
logging.warn("Word-normalized (P)PPL: {}".format(pppl_word_micro))
# pppl_word_macro = np.exp(- (plls / np.array(num_words_list)).mean())
# logging.warn("Word-normalized (P)PPL, macro: {}".format(pppl_word_macro))
# === END SHARED COMPUTATION ===
# How do we output?
if args.mode == 'hyp':
preds = scored_corpus.to_predictions()
preds.to_json(sys.stdout)
# otherwise we just print a list of log likelihoods
elif args.mode == 'ref':
scored_corpus.to_file(sys.stdout, scores_only=True)