def cmd_score()

in src/mlm/cmds.py [0:0]


def cmd_score(args: argparse.Namespace) -> None:
    """mlm score command
    """

    # Get model
    ctxs = setup_ctxs(args.gpus)
    weights_file = Path(args.weights) if isinstance(args.weights, str) else None
    # Redirect console output from GluonNLP downloading models
    with _stdout_to_stderr():
        model, vocab, tokenizer = get_pretrained(ctxs, args.model, weights_file, regression=args.no_mask)

    # Set scorer
    if MLMScorer._check_support(model):
        # GluonNLP
        scorer = MLMScorer(model, vocab, tokenizer, eos=args.eos, wwm=args.whole_word_mask, capitalize=args.capitalize, ctxs=ctxs)
    elif MLMScorerPT._check_support(model):
        # Transformers
        scorer = MLMScorerPT(model, vocab, tokenizer, eos=args.eos, wwm=args.whole_word_mask, capitalize=args.capitalize, ctxs=ctxs, lang=args.tgt)
    elif RegressionScorer._check_support(model):
        scorer = RegressionScorer(model, vocab, tokenizer, eos=args.eos, wwm=args.whole_word_mask, capitalize=args.capitalize, ctxs=ctxs)
    elif LMScorer._check_support(model):
        assert not args.whole_word_mask
        assert not args.no_mask
        scorer = LMScorer(model, vocab, tokenizer, eos=args.eos, capitalize=args.capitalize, ctxs=ctxs)
    else:
        raise ValueError(f"Model '{model.__class__.__name__}' not supported by any scorer.")

    # What data do we use?
    if args.mode == 'hyp':

        preds = Predictions.from_file(args.infile, max_utts=args.max_utts)
        # We 'deserialize' the predictions into a corpus, for better batching
        corpus = preds.to_corpus()

        logging.warn("# of input sequences: {}".format(len(preds)))
        logging.warn("# of hypotheses: {}".format(len(corpus)))

    elif args.mode == 'ref':

        corpus = Corpus.from_file(args.infile, max_utts=args.max_utts)
        logging.warn("# sentences: {}".format(len(corpus)))

    # === START SHARED COMPUTATION ===

    # A scorer takes a corpus and produces a list of scores in order of the corpus
    if args.detok:
        corpus_for_scoring = corpus.detok_copy()
    else:
        corpus_for_scoring = corpus
    scores, true_tok_lens = scorer.score(corpus_for_scoring, ratio=1, split_size=args.split_size, per_token=args.per_token)
    scored_corpus = ScoredCorpus.from_corpus_and_scores(corpus, scores)

    num_words_list, max_sent_len = corpus.get_num_words()
    if args.eos:
        logging.warn("Adding EOSes '.' to (P)PPL computation")
        num_words_list = [x+1 for x in num_words_list]
    num_words_total = sum(num_words_list)
    if args.eos:
        logging.warn("# words (excluding EOS '.'): {}".format(num_words_total))
    else:
        logging.warn("# words: {}".format(num_words_total))
    logging.warn("longest sentence: {}".format(max_sent_len))

    num_toks_total = sum(true_tok_lens)
    if args.eos:
        logging.warn("# tokens (including EOS '.'): {}".format(num_toks_total))
    else:
        logging.warn("# tokens: {}".format(num_toks_total))


    if not args.per_token:
        plls = np.array(scores)
        pppl_tok_micro = np.exp(- plls.sum() / num_toks_total).item()
        logging.warn("Token-level (P)PPL: {}".format(pppl_tok_micro))

        # pppl_tok_macro = np.exp(- (plls / np.array(true_tok_lens)).mean())
        # logging.warn("Token-level (P)PPL, macro: {}".format(pppl_tok_macro))

        pppl_word_micro = math.exp((num_toks_total / num_words_total) * math.log(pppl_tok_micro))
        logging.warn("Word-normalized (P)PPL: {}".format(pppl_word_micro))

        # pppl_word_macro = np.exp(- (plls / np.array(num_words_list)).mean())
        # logging.warn("Word-normalized (P)PPL, macro: {}".format(pppl_word_macro))

    # === END SHARED COMPUTATION ===

    # How do we output?
    if args.mode == 'hyp':

        preds = scored_corpus.to_predictions()
        preds.to_json(sys.stdout)

    # otherwise we just print a list of log likelihoods
    elif args.mode == 'ref':

        scored_corpus.to_file(sys.stdout, scores_only=True)