sourcecode/scoring/pflip_plus_model.py (1,225 lines of code) (raw):
"""Train and apply a model to predict which notes will lose CRH status.
This module implements a supervised model designed to be applied throughout the period
when a enters or be in status stabilization. The model predicts whether the note will ultimately
lock to CRH status or flip back to NMR. The features used by the model include:
* Note author and timing of the note in relation to the associated post.
* Individual user Helpfulness and tag ratings.
* Ratios of how common each tag is in ratings associated with the note.
* Aggregate statistics about rater factors (e.g. standard deviation of rater factors for
Helpful ratings, number of {Helpful, Somewhat Helpful, Not Helpful} ratings from users
with {positive, center, negative} factors, etc.).
* Burstiness of ratings (e.g. how many ratings occur soon after note creation, over short
periods of time or within the recent past).
* Creation, ratings and scoring outcomes of peer notes associated with the same post.
The training data includes notes that were CRH at some point and have either locked to
CRH or NMR, except for any note that either drifted to NMR status after locking. Notes
included in training must be associated with a post created after August 19, 2024, which
marks the point when usage of the timestampMillisOfFirstNmrDueToMinStableCrhTime changed
such that the value always reflects the timestamp when a note first entered stabilization
or NaN if the note never entered stabilization.
Since the model is designed to be applied throughout the entire stabilization period we include
each note twice, with features reflecting both when the note enters stabilization and when the
note achieves CRH status. If the note skips stabilization or exits stabilization without achieving
CRH status, then the note is included twice with identical features to maintain consistent
weighting across notes.
The feature extraction and model training are implemented entirely in scikit-learn.
"""
# Standard libraries
from io import BytesIO
import logging
from typing import Any, Dict, List, Optional, Tuple, Union
# Project libraries
from . import constants as c
from .enums import Scorers
from .pandas_utils import get_df_fingerprint
# 3rd party libraries
import joblib
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_selection import SelectPercentile, VarianceThreshold, chi2
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import auc as area_under_curve, confusion_matrix, roc_curve
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import (
FunctionTransformer,
KBinsDiscretizer,
OneHotEncoder,
PolynomialFeatures,
)
# Configure logger
logger = logging.getLogger("birdwatch.pflip_plus_model")
logger.setLevel(logging.INFO)
# Exposed constants
LABEL = "LABEL"
CRH = "CRH"
FLIP = "FLIP"
# Internal constants
_MAX = "MAX"
_MIN = "MIN"
_MIN_TWEET_ID = 1825679568688054351
# Internal column names
_RATER_FACTOR = "RATER_FACTOR"
_SCORING_CUTOFF_MTS = "SCORING_CUTOFF_MTS"
_SCORING_CUTOFF_MODE = "SCORING_CUTOFF_MODE"
_USER_HELPFULNESS_RATINGS = "USER_HELPFULNESS_RATINGS"
_USER_HELPFUL_TAGS = "USER_HELPFUL_TAGS"
_USER_NOT_HELPFUL_TAGS = "USER_NOT_HELPFUL_TAGS"
_NEGATIVE = "NEGATIVE"
_NEUTRAL = "NEUTRAL"
_POSITIVE = "POSITIVE"
_BUCKET_COUNT_COLS = [
f"{viewpoint}_{rating}"
for viewpoint in [_NEGATIVE, _NEUTRAL, _POSITIVE]
for rating in [c.helpfulValueTsv, c.somewhatHelpfulValueTsv, c.notHelpfulValueTsv]
]
_RATING_TIME_BUCKETS = [1, 5, 15, 60]
_QUICK_RATING_COLS = [f"FIRST_{cutoff}_TOTAL" for cutoff in _RATING_TIME_BUCKETS] + [
f"FIRST_{cutoff}_RATIO" for cutoff in _RATING_TIME_BUCKETS
]
_BURST_RATING_COLS = [f"BURST_{cutoff}_TOTAL" for cutoff in _RATING_TIME_BUCKETS] + [
f"BURST_{cutoff}_RATIO" for cutoff in _RATING_TIME_BUCKETS
]
_RECENT_RATING_COLS = [f"RECENT_{cutoff}_TOTAL" for cutoff in _RATING_TIME_BUCKETS] + [
f"RECENT_{cutoff}_RATIO" for cutoff in _RATING_TIME_BUCKETS
]
_NOTE_WRITING_LATENCY = "NOTE_WRITING_LATENCY"
_MAX_POS_HELPFUL = "MAX_POS_HELPFUL"
_MAX_NEG_HELPFUL = "MAX_NEG_HELPFUL"
_MEAN_POS_HELPFUL = "MEAN_POS_HELPFUL"
_MEAN_NEG_HELPFUL = "MEAN_NEG_HELPFUL"
_STD_HELPFUL = "STD_HELPFUL"
_MAX_DIFF = "MAX_DIFF"
_MEAN_DIFF = "MEAN_DIFF"
_STATS_COLS = [
_MAX_POS_HELPFUL,
_MAX_NEG_HELPFUL,
_MEAN_POS_HELPFUL,
_MEAN_NEG_HELPFUL,
_STD_HELPFUL,
_MAX_DIFF,
_MEAN_DIFF,
]
_NOTE_CREATION_MILLIS = "noteCreationMillis"
_TWEET_CREATION_MILLIS = "tweetCreationMillis"
_RATED_ON_NOTE_ID = "ratedOnNoteId"
_LOCAL = "LOCAL"
_PEER_MISLEADING = "PEER_MISLEADING"
_PEER_NON_MISLEADING = "PEER_NON_MISLEADING"
_TOTAL_PEER_NOTES = "TOTAL_PEER_NOTES"
_TOTAL_PEER_MISLEADING_NOTES = "TOTAL_PEER_MISLEADING_NOTES"
_TOTAL_PEER_NON_MISLEADING_NOTES = "TOTAL_PEER_NON_MISLEADING_NOTES"
_TOTAL_PEER_STABILIZATION_NOTES = "TOTAL_PEER_STABILIZATION_NOTES"
_TOTAL_PEER_CRH_NOTES = "TOTAL_PEER_CRH_NOTES"
# Define helper functions at module level so feature extraction pipeline doesn't require
# any lambda functions (and consequently can be pickled.)
def _identity(x: Any) -> Any:
"""Used to create modified preprocessing and tokenization for CountVectorizer."""
return x
def _feature_log(features: pd.Series) -> pd.Series:
"""Helper to log-scale features while allowing for zero valued features."""
return np.log(1 + features) / np.log(2)
def _get_timestamp_from_snowflake(snowflake: int) -> int:
"""Helper function to recover a timestamp from a snowflake ID."""
return (snowflake >> 22) + 1288834974657
def _set_to_list(series: pd.Series) -> List:
"""Helper function to convert a Series containing sets to lists."""
assert isinstance(series, pd.Series)
return [list(x) for x in series]
def _fill_na(series: pd.Series) -> np.ndarray:
return series.fillna(-1).values
def _reshape(series: pd.Series) -> np.ndarray:
assert isinstance(series, pd.Series)
return series.values.reshape(-1, 1)
class PFlipPlusModel(object):
def __init__(
self,
ratingRecencyCutoffMinutes: int = 15,
helpfulnessRaterMin: int = 1,
tagRaterMin: int = 5,
helpfulTagPercentile: int = 10,
notHelpfulTagPercentile: int = 50,
penalty: str = "l1",
C: float = 0.04,
maxIter: int = 2500,
verbose: int = 0,
classWeight: Dict[int, int] = {0: 1, 1: 1},
trainSize: float = 0.9,
solver: str = "liblinear",
tol: float = 1e-4,
seed: Optional[int] = None,
crhFpRate: float = 0.6,
tag_ratio_bins: int = 10,
rating_count_bins: int = 7,
factor_stats_bins: int = 10,
burst_bins: int = 5,
latency_bins: int = 10,
peer_note_count_bins: int = 4,
):
"""Configure PFlipModel.
Args:
ratingRecencyCutoffMinutes: Avoiding skew in training requires identifying the time at which
the note would have been scored in production and which notes / ratings would have been
available at that time. ratingRecencyCutoffMinutes specifies the assumed latency from
rating creation through to when a rating becomes available in final scoring (e.g. if we know
a note was set CRH at a particular point in time then we require ratings be created at least
ratingRecencyCutoffMinutes minutes earlier).
"""
self._pipeline: Optional[Pipeline] = None
self._predictionThreshold: Optional[float] = None
self._ratingRecencyCutoff = 1000 * 60 * ratingRecencyCutoffMinutes
self._helpfulnessRaterMin = helpfulnessRaterMin
self._tagRaterMin = tagRaterMin
self._helpfulTagPercentile = helpfulTagPercentile
self._notHelpfulTagPercentile = notHelpfulTagPercentile
self._penalty = penalty
self._C = C
self._maxIter = maxIter
self._verbose = verbose
self._classWeight = classWeight
self._solver = solver
self._tol = tol
self._trainSize = trainSize
self._seed = seed
self._crhFpRate = crhFpRate
self._tag_ratio_bins = tag_ratio_bins
self._rating_count_bins = rating_count_bins
self._factor_stats_bins = factor_stats_bins
self._burst_bins = burst_bins
self._latency_bins = latency_bins
self._peer_note_count_bins = peer_note_count_bins
self._column_thresholds: Dict[str, float] = dict()
def _get_notes(
self,
notes: pd.DataFrame,
noteStatusHistory: pd.DataFrame,
):
"""Determine notes to include in scoring and associated metadata.
To be included, notes must have an associated tweet and classification. Returned columns
include: noteId, tweetId, classification, noteCreationMillis, tweetCreationMillis
Args:
notes: Input DataFrame containing creation times.
noteStatusHistory: Input DataFrame containing creation times.
"""
# Validate that "notes" contains a subset of noteStatusHistory.
assert notes[c.noteIdKey].nunique() == len(notes), "notes contains duplicate noteIds"
assert noteStatusHistory[c.noteIdKey].nunique() == len(
noteStatusHistory
), "noteStatusHistory contains duplicate noteIds"
assert len(notes) == len(
noteStatusHistory[[c.noteIdKey, c.createdAtMillisKey]].merge(
notes[[c.noteIdKey, c.createdAtMillisKey]],
on=[c.noteIdKey, c.createdAtMillisKey],
how="inner",
)
), "notes is not a subset of noteStatusHistory"
# Develop set of candidate notes
candidateNotes = noteStatusHistory[[c.noteIdKey, c.createdAtMillisKey]].rename(
columns={c.createdAtMillisKey: _NOTE_CREATION_MILLIS}
)
candidateNotes = candidateNotes.merge(notes[[c.noteIdKey, c.tweetIdKey, c.classificationKey]])
candidateNotes[_TWEET_CREATION_MILLIS] = [
_get_timestamp_from_snowflake(tweetId) for tweetId in candidateNotes[c.tweetIdKey]
]
# Prune candidates to require that associated tweet and classification are available, and possibly
# that associated tweet is recent.
candidateNotes = candidateNotes[
(candidateNotes[c.tweetIdKey] > 0) & (candidateNotes[c.classificationKey].notna())
]
return candidateNotes
def _compute_scoring_cutoff(
self, noteStatusHistory: pd.DataFrame, ratings: pd.DataFrame, mode: str, minRatings: int = 5
) -> pd.DataFrame:
"""Compute time limits on which ratings to include for each note.
Recall that some notes may enter stabilization and then later exit to CRH, some may enter stabilization
and exit to NMR, and some may skip stabilizaiton and go directly to CRH. We support taking the min
and max of the two timestamps so that we have a datapoint for notes both at the beginning and end of
any stabilization period. For notes that skip stabilization or never go CRH, this means the note is
included twice with the same features, which is OK since it ensures weight parity with notes that
are included twice with different features. We do want to capture notes at both the beginning and
end of the stabilization period so that the model performs well throughout the entire stabilization
period.
Notes that never enter stabilization or go CRH will be dropped because they should not be included
in training.
Args:
noteStatusHistory: pd.DataFrame used to determine time of the first CRH status,
if applicable.
mode: whether to return the max or min of timestampMillisOfFirstNmrDueToMinStableCrhTime
and timestampMillisOfNoteFirstNonNMRLabel
Returns:
pd.DataFrame with noteId and STATUS_MTS columns
"""
# Select columns and prune to notes that either entered stabilization or went CRH. Note that
# the goal here is to not set a rating cutoff for notes that went CRNH since the model won't
# be applied to these notes in prod.
scoringCutoff = noteStatusHistory[
[
c.noteIdKey,
c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey,
c.timestampMillisOfNoteFirstNonNMRLabelKey,
c.firstNonNMRLabelKey,
]
].copy()
scoringCutoff = scoringCutoff[
(scoringCutoff[c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey].notna())
| (scoringCutoff[c.firstNonNMRLabelKey] == c.currentlyRatedHelpful)
].drop(columns=c.firstNonNMRLabelKey)
# Set cutoff timestamps
if mode == _MAX:
cutoffs = scoringCutoff[
[
c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey,
c.timestampMillisOfNoteFirstNonNMRLabelKey,
]
].max(axis=1)
else:
assert mode == _MIN, f"unexpected mode: {mode}"
cutoffs = scoringCutoff[
[
c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey,
c.timestampMillisOfNoteFirstNonNMRLabelKey,
]
].min(axis=1)
# cutoffs doesn't contain any NaN values and has float type, but Pandas generates an
# error when casting directly to Int64Dtype. There is no error if cast to int64 first.
assert cutoffs.dtype == np.float64
assert cutoffs.isna().sum() == 0
scoringCutoff[_SCORING_CUTOFF_MTS] = cutoffs.astype(np.int64).astype(pd.Int64Dtype())
scoringCutoff[_SCORING_CUTOFF_MTS] = (
scoringCutoff[_SCORING_CUTOFF_MTS] - self._ratingRecencyCutoff
)
# Compute cutoff that is guaranteed to include at least 5 ratings
ratings = ratings[[c.noteIdKey, c.createdAtMillisKey]]
cutoffByRatings = (
ratings.groupby(c.noteIdKey)
.max()
.reset_index(drop=False)
.rename(columns={c.createdAtMillisKey: "maxRatingMts"})
.astype(pd.Int64Dtype())
)
cutoffByRatings = cutoffByRatings.merge(
ratings.sort_values(c.createdAtMillisKey, ascending=True)
.groupby(c.noteIdKey)
.nth(minRatings - 1)
.rename(columns={c.createdAtMillisKey: "nthRatingMts"})
.astype(pd.Int64Dtype()),
how="left",
)
cutoffByRatings["ratingMin"] = cutoffByRatings[["maxRatingMts", "nthRatingMts"]].min(axis=1)
# Merge cutoffs by time and by ratings
beforeMerge = len(scoringCutoff)
scoringCutoff = scoringCutoff.merge(cutoffByRatings[[c.noteIdKey, "ratingMin"]])
assert len(scoringCutoff) == beforeMerge
scoringCutoff[_SCORING_CUTOFF_MTS] = scoringCutoff[[_SCORING_CUTOFF_MTS, "ratingMin"]].max(
axis=1
)
scoringCutoff[_SCORING_CUTOFF_MODE] = mode
return scoringCutoff[[c.noteIdKey, _SCORING_CUTOFF_MTS, _SCORING_CUTOFF_MODE]]
def _label_notes(
self,
noteStatusHistory: pd.DataFrame,
) -> pd.DataFrame:
"""Generate a DataFrame mapping noteIds to labels.
We define a CRH note as any note that is locked to CRH, and a FLIP note as any note
that was scored as CRH at some point but has since locked to NMR. Note that we exclude
notes that are locked to CRH but decided by ScoringDriftGuard, since that indicates the
model wanted to score the note as NMR (and therefore it is unclear whether the note is
best labeled CRH or FLIP).
Args:
noteStatusHistory: pd.DataFrame used to determine locked status and whether there was
a prior CRH status.
Returns:
pd.DataFrame with noteId and LABEL columns
"""
# Assemble relevant data for labeling
labels = noteStatusHistory[
[
c.noteIdKey,
# If set, implies note was on track to be CRH at some point
c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey,
# If set to CRH, implies note was actually CRH at some point
c.firstNonNMRLabelKey,
# Use to determine final status and whether note is locked
c.lockedStatusKey,
# If set to ScoringDriftGuard, indicates note may be prone to flipping
c.currentDecidedByKey,
]
].copy()
labels[LABEL] = pd.NA
labels.loc[(labels[c.lockedStatusKey] == c.currentlyRatedHelpful), LABEL] = CRH
labels.loc[
(labels[c.firstNonNMRLabelKey] == c.currentlyRatedHelpful)
& (labels[c.lockedStatusKey].isin({c.needsMoreRatings, c.currentlyRatedNotHelpful})),
LABEL,
] = FLIP
labels.loc[
(~labels[c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey].isna())
& (labels[c.firstNonNMRLabelKey].isna())
& (labels[c.lockedStatusKey].isin({c.needsMoreRatings, c.currentlyRatedNotHelpful})),
LABEL,
] = FLIP
labels = labels.dropna(subset=LABEL)
logger.info(f"labels before ScoringDriftGuard:\n{labels[LABEL].value_counts(dropna=False)}")
# Note that we don't exclude notes decided by ScoringDriftGuard when a note locks to NMR
# after being CRH and is now decided by ScoringDriftGuard (implying the note was once again
# scored as CRH) because in that case the note is labeled as FLIP and the involvement of
# ScoringDriftGuard only provides further evidence that the note flips status.
dropRows = (labels[LABEL] == CRH) & (
np.array(
[decider.startswith("ScoringDriftGuard") for decider in labels[c.currentDecidedByKey]]
)
)
labels = labels[~dropRows][[c.noteIdKey, LABEL]]
logger.info(f"labels after ScoringDriftGuard:\n{labels[LABEL].value_counts(dropna=False)}")
return labels
def _compute_rater_factors(self, prescoringRaterModelOutput: pd.DataFrame) -> pd.DataFrame:
"""Generate a DataFrame mapping raterParticipantIds to factors.
Each rater is assigned their factor from either Core, Expansion or ExpansionPlus,
prioritizing Core, then Expansion, then ExpansionPlus.
Args:
prescoringRaterModelOutput: pd.DataFrame used to determine rater factors.
Returns:
pd.DataFrame with raterParticipantId and RATER_FACTOR columns
"""
# Obtain prescoring rater factors
coreFactors = prescoringRaterModelOutput[
prescoringRaterModelOutput[c.scorerNameKey] == Scorers.MFCoreScorer.name
][[c.raterParticipantIdKey, c.internalRaterFactor1Key]].rename(
columns={c.internalRaterFactor1Key: c.coreRaterFactor1Key}
)
expansionFactors = prescoringRaterModelOutput[
prescoringRaterModelOutput[c.scorerNameKey] == Scorers.MFExpansionScorer.name
][[c.raterParticipantIdKey, c.internalRaterFactor1Key]].rename(
columns={c.internalRaterFactor1Key: c.expansionRaterFactor1Key}
)
expansionPlusFactors = prescoringRaterModelOutput[
prescoringRaterModelOutput[c.scorerNameKey] == Scorers.MFExpansionPlusScorer.name
][[c.raterParticipantIdKey, c.internalRaterFactor1Key]].rename(
columns={c.internalRaterFactor1Key: c.expansionPlusRaterFactor1Key}
)
# Combine and prioritize factors
raterFactors = coreFactors.merge(expansionFactors, how="outer").merge(
expansionPlusFactors, how="outer"
)
raterFactors[_RATER_FACTOR] = raterFactors[c.expansionPlusRaterFactor1Key]
raterFactors.loc[
~raterFactors[c.expansionRaterFactor1Key].isna(), _RATER_FACTOR
] = raterFactors[c.expansionRaterFactor1Key]
raterFactors.loc[~raterFactors[c.coreRaterFactor1Key].isna(), _RATER_FACTOR] = raterFactors[
c.coreRaterFactor1Key
]
return raterFactors[[c.raterParticipantIdKey, _RATER_FACTOR]]
def _prepare_local_ratings(
self,
ratings: pd.DataFrame,
scoredNotes: pd.DataFrame,
) -> pd.DataFrame:
"""Filter ratings DF to only include ratings on the notes being scored.
Args:
ratings: pd.DataFrame containing all ratings used in scoring
scoredNotes: pd.DataFrame specifying which notes are actually being scored
"""
return ratings.merge(scoredNotes[[c.noteIdKey]])
def _prepare_peer_ratings(
self, ratings: pd.DataFrame, notes: pd.DataFrame, scoredNotes: pd.DataFrame
) -> pd.DataFrame:
"""Construct a ratings DF that captures ratings on peer notes.
Peer notes are defined as different notes that are on the same post. We consider the
ratings for the peer note to provide signal for the note being scored (e.g. if another
note received many "note not needed" ratings, then it is more likely that the post
itself does not need a note).
Since each unique {note, peer} pair is a row in the output dataframe, the column name
for the note being scored remains "noteId" and the note which originally received the
rating is the "ratedOnNoteId".
Args:
ratings: pd.DataFrame containing all ratings used in scoring
notes: pd.DataFrame relating notes to posts
scoredNotes: pd.DataFrame specifying which notes are actually being scored
"""
# Augment ratings with tweetId and classification
beforeMerge = len(ratings)
ratings = ratings.merge(notes[[c.noteIdKey, c.tweetIdKey, c.classificationKey]])
assert len(ratings) == beforeMerge
ratings = ratings.rename(columns={c.noteIdKey: _RATED_ON_NOTE_ID})
# Create one copy of each rating for each scored note on the post, excluding
# ratings that occur on that post itself
assert len(scoredNotes) == len(scoredNotes[[c.noteIdKey, c.tweetIdKey]].drop_duplicates())
ratings = ratings.merge(scoredNotes[[c.noteIdKey, c.tweetIdKey]], on=c.tweetIdKey)
ratings = ratings[
ratings[c.noteIdKey] != ratings[_RATED_ON_NOTE_ID]
] # throw out ratings that occur on the note itself
ratings = ratings.drop(columns={c.tweetIdKey})
return ratings
def _apply_cutoff(self, ratings: pd.DataFrame, scoredNotes: pd.DataFrame) -> pd.DataFrame:
"""Filter a ratings DF to only contain ratings before an applicable cutoff.
Args:
ratings: pd.DataFrame containing peer and/or local ratings.
scoredNotes: pd.DataFrame specifying scoring cutoff timestamps
"""
assert scoredNotes[_SCORING_CUTOFF_MTS].isna().sum() == 0
beforeMerge = len(ratings)
ratings = ratings.merge(scoredNotes[[c.noteIdKey, _SCORING_CUTOFF_MTS]])
assert len(ratings) == beforeMerge
return ratings[ratings[c.createdAtMillisKey] <= ratings[_SCORING_CUTOFF_MTS]].drop(
columns=_SCORING_CUTOFF_MTS
)
def _get_note_writing_latency(self, notes: pd.DataFrame) -> pd.DataFrame:
"""Identify the time in milliseconds from post to note creation."""
notes = notes[[c.noteIdKey, _NOTE_CREATION_MILLIS, _TWEET_CREATION_MILLIS]].copy()
notes[_NOTE_WRITING_LATENCY] = notes[_NOTE_CREATION_MILLIS] - notes[_TWEET_CREATION_MILLIS]
return notes[[c.noteIdKey, _NOTE_WRITING_LATENCY]]
def _get_quick_rating_stats(self, notes: pd.DataFrame, ratings: pd.DataFrame) -> pd.DataFrame:
"""Return counts and ratios of how many ratings occurred in the first 1/5/15/60 minutes.
Args:
notes: DF specifying note creation timestamps.
ratings: DF specifying local rating timestamps.
"""
ratingTotals = (
ratings[[c.noteIdKey]]
.value_counts()
.to_frame()
.reset_index(drop=False)
.rename(columns={"count": "total"})
)
ratingTotals = notes[[c.noteIdKey]].merge(ratingTotals, how="left")
ratingTotals = ratingTotals.fillna({"total": 0}).astype(pd.Int64Dtype())
for cutoff in _RATING_TIME_BUCKETS:
beforeCutoff = ratings[[c.noteIdKey, c.createdAtMillisKey]].rename(
columns={c.createdAtMillisKey: "ratingCreationMts"}
)
beforeCutoff = beforeCutoff.merge(notes[[c.noteIdKey, _NOTE_CREATION_MILLIS]])
beforeCutoff = beforeCutoff[
beforeCutoff["ratingCreationMts"]
< (beforeCutoff[_NOTE_CREATION_MILLIS] + (1000 * 60 * cutoff))
]
cutoffCount = (
beforeCutoff[[c.noteIdKey]]
.value_counts()
.to_frame()
.reset_index(drop=False)
.rename(columns={"count": f"FIRST_{cutoff}_TOTAL"})
)
ratingTotals = ratingTotals.merge(cutoffCount, how="left").fillna(0)
ratingTotals = ratingTotals.astype(pd.Int64Dtype())
for cutoff in _RATING_TIME_BUCKETS:
ratingTotals[f"FIRST_{cutoff}_RATIO"] = ratingTotals[f"FIRST_{cutoff}_TOTAL"] / (
ratingTotals["total"].clip(lower=1)
)
return ratingTotals[[c.noteIdKey] + _QUICK_RATING_COLS]
def _get_burst_rating_stats(self, notes: pd.DataFrame, ratings: pd.DataFrame) -> pd.DataFrame:
"""Return counts and ratios of the max ratings in 1/5/15/60 minute windows.
Args:
ratings: DF specifying local rating timestamps.
"""
ratingTotals = (
ratings[[c.noteIdKey]]
.value_counts()
.to_frame()
.reset_index(drop=False)
.rename(columns={"count": "total"})
)
initialNotes = len(notes)
ratingTotals = notes[[c.noteIdKey]].merge(ratingTotals, how="left")
ratingTotals = ratingTotals.fillna({"total": 0}).astype(pd.Int64Dtype())
for cutoff in _RATING_TIME_BUCKETS:
ratingCounts = []
for offset in range(cutoff):
offsetRatings = ratings[[c.noteIdKey, c.createdAtMillisKey]].copy()
offsetRatings[c.createdAtMillisKey] = (
offsetRatings[c.createdAtMillisKey] + (1000 * 60 * offset)
) // (1000 * 60 * cutoff)
offsetRatings = (
offsetRatings.value_counts()
.to_frame()
.reset_index(drop=False)[[c.noteIdKey, "count"]]
.groupby(c.noteIdKey)
.max()
.reset_index(drop=False)
)
ratingCounts.append(offsetRatings)
ratingCounts = (
pd.concat(ratingCounts)
.groupby(c.noteIdKey)
.max()
.reset_index(drop=False)
.rename(columns={"count": f"BURST_{cutoff}_TOTAL"})
).astype(pd.Int64Dtype())
ratingTotals = ratingTotals.merge(ratingCounts, how="left").fillna(
{f"BURST_{cutoff}_TOTAL": 0}
)
ratingTotals[f"BURST_{cutoff}_RATIO"] = ratingTotals[f"BURST_{cutoff}_TOTAL"] / (
ratingTotals["total"].clip(lower=1)
)
assert (
len(ratingTotals) == initialNotes
), f"unexpected length mismatch: {len(ratingTotals)} vs. {initialNotes}"
return ratingTotals[[c.noteIdKey] + _BURST_RATING_COLS]
def _get_recent_rating_stats(
self, scoredNotes: pd.DataFrame, ratings: pd.DataFrame, prepareForTraining: bool
):
"""Generate counts of ratings within the last 1/5/15/20 minutes.
Note that this process must work differently during training and in production, since
during training future ratings will be available. We solve this by treating the
_SCORING_CUTOFF_MTS as the effective time at which scoring occurred, and include ratings
based on that timestamp.
Args:
scoredNotes: pd.DataFrame specifying scoring cutoff timestamps
ratings: pd.DataFrame containing local ratings.
prepareForTraining: bool specifying whether to prune ratings.
"""
# Define notion of effective present for each rating
initialRatings = len(ratings)
ratings = ratings[[c.noteIdKey, c.createdAtMillisKey]].copy()
if prepareForTraining:
ratings = ratings.merge(
scoredNotes[[c.noteIdKey, _SCORING_CUTOFF_MTS]].rename(
columns={_SCORING_CUTOFF_MTS: "effectivePresent"}
)
)
else:
ratings["effectivePresent"] = ratings[c.createdAtMillisKey].max()
assert len(ratings) == initialRatings
assert (ratings[c.createdAtMillisKey] > ratings["effectivePresent"]).sum() == 0
assert ratings[c.createdAtMillisKey].isna().sum() == 0
assert ratings["effectivePresent"].isna().sum() == 0
# Develop counts and ratios of recent ratings in specific time ranges
ratingTotals = (
ratings[[c.noteIdKey]]
.value_counts()
.to_frame()
.reset_index(drop=False)
.rename(columns={"count": "total"})
)
ratingTotals = scoredNotes[[c.noteIdKey]].merge(ratingTotals, how="left")
ratingTotals = ratingTotals.fillna({"total": 0}).astype(pd.Int64Dtype())
for cutoff in _RATING_TIME_BUCKETS:
afterCutoff = ratings[
ratings[c.createdAtMillisKey] > (ratings["effectivePresent"] - (1000 * 60 * cutoff))
]
cutoffCount = (
afterCutoff[[c.noteIdKey]]
.value_counts()
.to_frame()
.reset_index(drop=False)
.rename(columns={"count": f"RECENT_{cutoff}_TOTAL"})
)
ratingTotals = ratingTotals.merge(cutoffCount, how="left").fillna(0)
ratingTotals = ratingTotals.astype(pd.Int64Dtype())
for cutoff in _RATING_TIME_BUCKETS:
ratingTotals[f"RECENT_{cutoff}_RATIO"] = ratingTotals[f"RECENT_{cutoff}_TOTAL"] / (
ratingTotals["total"].clip(lower=1)
)
return ratingTotals[[c.noteIdKey] + _RECENT_RATING_COLS]
def _get_helpfulness_ratings(self, notes: pd.DataFrame, ratings: pd.DataFrame) -> pd.DataFrame:
"""Return a DataFrame with one row per note and a column with a nested list of helpfulness ratings.
Args:
notes: pd.DataFrame used to determine full set of notes
ratings: pd.DataFrame containing ratings that should contribute to model features.
Returns:
pd.DataFrame with two columns: noteIds and all user helpfulness ratings on the note.
"""
raters = ratings[c.raterParticipantIdKey].astype(str)
helpfulnessRatings = ratings[[c.noteIdKey]].copy()
helpfulnessRatings.loc[:, _USER_HELPFULNESS_RATINGS] = (
raters + ":" + ratings[c.helpfulnessLevelKey].astype(str)
)
helpfulnessRatings = (
helpfulnessRatings[[c.noteIdKey, _USER_HELPFULNESS_RATINGS]]
.groupby(c.noteIdKey)
.agg(set)
.reset_index(drop=False)
)
helpfulnessRatings = notes.merge(helpfulnessRatings, how="left")
helpfulnessRatings[_USER_HELPFULNESS_RATINGS] = helpfulnessRatings[
_USER_HELPFULNESS_RATINGS
].apply(lambda d: d if isinstance(d, set) else {})
return helpfulnessRatings
def _get_tag_ratios(self, notes: pd.DataFrame, ratings: pd.DataFrame) -> pd.DataFrame:
"""Produce a DataFrame specifying the ratio of ratings for each note that included each tag.
Args:
notes: pd.DataFrame used to specify the universe of all notes to include.
ratings: pd.DataFrame containing all ratings for feature extraction.
Returns:
pd.DataFrame containing one row per note and one column per rating tag.
"""
tags = ratings[[c.noteIdKey] + c.helpfulTagsTSVOrder + c.notHelpfulTagsTSVOrder].copy()
total_ratings = "total_ratings"
tags[total_ratings] = 1
tags = tags.groupby(c.noteIdKey).sum().reset_index(drop=False)
tags[c.helpfulTagsTSVOrder + c.notHelpfulTagsTSVOrder] = tags[
c.helpfulTagsTSVOrder + c.notHelpfulTagsTSVOrder
].divide(tags[total_ratings], axis=0)
tags = notes[[c.noteIdKey]].merge(tags.drop(columns=total_ratings), how="left")
return tags[[c.noteIdKey] + c.helpfulTagsTSVOrder + c.notHelpfulTagsTSVOrder]
def _get_user_tag_ratings(
self,
notes: pd.DataFrame,
ratings: pd.DataFrame,
outCol: str,
tagCols: List[str],
) -> pd.DataFrame:
"""Return a DataFrame with one row per note and a column with a nested list of user rating tags.
Args:
notes: pd.DataFrame used to specify the universe of all notes to include.
ratings: pd.DataFrame containing all ratings for feature extraction.
outCol: str identifying output column to contain list of user tag ratings
tagCols: List of tag columns to include in outCol
Returns:
pd.DataFrame containing one row per note and one column containing all user rating tags.
"""
ratingTags = ratings[[c.noteIdKey, c.raterParticipantIdKey] + tagCols].copy()
tagStrs = np.array(tagCols)
ratingTags[outCol] = [
{f"{rater}:{tag}" for tag in tagStrs[np.where(row)[0]]}
for rater, row in zip(ratingTags[c.raterParticipantIdKey], ratingTags[tagCols].values)
]
ratingTags = (
ratingTags[[c.noteIdKey, outCol]]
.groupby(c.noteIdKey)
.agg(lambda x: set().union(*x))
.reset_index(drop=False)
)
ratingTags = notes[[c.noteIdKey]].merge(ratingTags, how="left")
ratingTags[outCol] = ratingTags[outCol].apply(
lambda userTags: userTags if isinstance(userTags, set) else {}
)
return ratingTags
def _get_bucket_count_totals(self, notes: pd.DataFrame, ratings: pd.DataFrame) -> pd.DataFrame:
"""Returns a DataFrame with one row per note and 9 columns containing buckets of rating counts.
Args:
notes: pd.DataFrame used to specify the universe of all notes to include.
ratings: pd.DataFrame containing all ratings for feature extraction.
Returns:
pd.DataFrame containing one row per note and one column containing all user rating tags.
"""
summary = ratings[[c.noteIdKey, _RATER_FACTOR, c.helpfulnessLevelKey]].copy()
summary = summary[~summary[_RATER_FACTOR].isna()]
summary[_NEGATIVE] = summary[_RATER_FACTOR] < -0.3
summary[_NEUTRAL] = (summary[_RATER_FACTOR] >= -0.3) & (summary[_RATER_FACTOR] <= 0.3)
summary[_POSITIVE] = summary[_RATER_FACTOR] > 0.3
summary[c.helpfulValueTsv] = summary[c.helpfulnessLevelKey] == c.helpfulValueTsv
summary[c.somewhatHelpfulValueTsv] = summary[c.helpfulnessLevelKey] == c.somewhatHelpfulValueTsv
summary[c.notHelpfulValueTsv] = summary[c.helpfulnessLevelKey] == c.notHelpfulValueTsv
for viewpoint in [_NEGATIVE, _NEUTRAL, _POSITIVE]:
for rating in [c.helpfulValueTsv, c.somewhatHelpfulValueTsv, c.notHelpfulValueTsv]:
summary[f"{viewpoint}_{rating}"] = summary[viewpoint].multiply(summary[rating])
summary = summary[[c.noteIdKey] + _BUCKET_COUNT_COLS]
summary = summary.groupby(c.noteIdKey).sum().reset_index(drop=False)
summary[_BUCKET_COUNT_COLS] = summary[_BUCKET_COUNT_COLS].astype(np.float64)
summary = (
notes[[c.noteIdKey]]
.merge(summary, on=c.noteIdKey, how="left")
.fillna(0.0)
.astype(pd.Int64Dtype())
)
return summary
def _get_helpful_rating_stats(self, notes: pd.DataFrame, ratings: pd.DataFrame) -> pd.DataFrame:
"""Compute aggregate statistics about the Helpful ratings on a note.
Args:
notes: pd.DataFrame used to specify the universe of all notes to include.
ratings: pd.DataFrame containing all ratings for feature extraction.
Returns:
pd.DataFrame with one row per note and 7 columns for aggregate statistics about rater
factors for Helpful ratings.
"""
# Prune ratings to only include Helpful ratings from users with a factor
ratings = ratings[[c.noteIdKey, _RATER_FACTOR, c.helpfulnessLevelKey]].copy()
ratings = ratings[~ratings[_RATER_FACTOR].isna()]
ratings = ratings[ratings[c.helpfulnessLevelKey] == c.helpfulValueTsv]
ratings = ratings.drop(columns=c.helpfulnessLevelKey)
# Compute rating stats
maxPosHelpful = (
ratings[ratings[_RATER_FACTOR] > 0]
.groupby(c.noteIdKey)
.max()
.reset_index()
.rename(columns={_RATER_FACTOR: _MAX_POS_HELPFUL})
)
maxNegHelpful = (
ratings[ratings[_RATER_FACTOR] <= 0]
.groupby(c.noteIdKey)
.min()
.abs()
.reset_index()
.rename(columns={_RATER_FACTOR: _MAX_NEG_HELPFUL})
)
meanPosHelpful = (
ratings[ratings[_RATER_FACTOR] > 0]
.groupby(c.noteIdKey)
.mean()
.reset_index()
.rename(columns={_RATER_FACTOR: _MEAN_POS_HELPFUL})
)
meanNegHelpful = (
ratings[ratings[_RATER_FACTOR] <= 0]
.groupby(c.noteIdKey)
.mean()
.abs()
.reset_index()
.rename(columns={_RATER_FACTOR: _MEAN_NEG_HELPFUL})
)
stdHelpful = (
ratings.groupby(c.noteIdKey).std().reset_index().rename(columns={_RATER_FACTOR: _STD_HELPFUL})
)
# Compile into features per-note
notes = notes[[c.noteIdKey]].merge(maxPosHelpful, on=c.noteIdKey, how="left")
notes = notes.merge(maxNegHelpful, on=c.noteIdKey, how="left")
notes = notes.merge(meanPosHelpful, on=c.noteIdKey, how="left")
notes = notes.merge(meanNegHelpful, on=c.noteIdKey, how="left")
notes = notes.merge(stdHelpful, on=c.noteIdKey, how="left")
notes[_MAX_DIFF] = notes[_MAX_POS_HELPFUL] + notes[_MAX_NEG_HELPFUL]
notes[_MEAN_DIFF] = notes[_MEAN_POS_HELPFUL] + notes[_MEAN_NEG_HELPFUL]
return notes
def _make_note_info_from_ratings(
self,
notes: pd.DataFrame,
ratings: pd.DataFrame,
) -> pd.DataFrame:
"""Generate features derived from peer or local ratings associated with a note.
Generated features include user helpfulness and tag ratings, buckets of rating counts,
stats about rater factor distributions and ratios of tags across ratings.
Args:
notes: DF specifying which notes should be included in the output
ratings: DF containing peer or local ratings to base the features on
"""
# Augment notes with features. Note that attributes of the note (e.g. author,
# creation time) should always be available because we filter to notes with the creation time
# in the last year, inherently removing any deleted notes where the creation time is unavailable.
helpfulnessRatings = self._get_helpfulness_ratings(notes[[c.noteIdKey]], ratings)
notes = notes.merge(helpfulnessRatings, how="inner")
userHelpfulTags = self._get_user_tag_ratings(
notes[[c.noteIdKey]], ratings, _USER_HELPFUL_TAGS, c.helpfulTagsTSVOrder
)
notes = notes.merge(userHelpfulTags, how="inner")
userNotHelpfulTags = self._get_user_tag_ratings(
notes[[c.noteIdKey]], ratings, _USER_NOT_HELPFUL_TAGS, c.notHelpfulTagsTSVOrder
)
notes = notes.merge(userNotHelpfulTags, how="inner")
bucketCounts = self._get_bucket_count_totals(notes[[c.noteIdKey]], ratings)
notes = notes.merge(bucketCounts, how="inner")
helpfulStats = self._get_helpful_rating_stats(notes[[c.noteIdKey]], ratings)
notes = notes.merge(helpfulStats, how="inner")
tagRatios = self._get_tag_ratios(notes[[c.noteIdKey]], ratings)
notes = notes.merge(tagRatios, how="inner")
return notes
def _get_note_counts(
self,
scoredNotes: pd.DataFrame,
notes: pd.DataFrame,
noteStatusHistory: pd.DataFrame,
prepareForTraining: bool,
) -> pd.DataFrame:
"""Return counts of relevant events on peer notes.
Counts should include: number of misleading and non-misleading notes created on the same post,
number of misleading notes on the same post that went CRH and ratios of the above values.
Note that aggregation works differently during training and in production, since training
effectively has access to data from the future.
Args:
scoredNotes: DF containing {note, tweet} associations and effective scoring times for training.
notes: DF containing set of all notes, not just those that are being scored.
noteStatusHistory: DF specifying key scoring timestamps for peer notes.
prepareForTraining: whether notes should be pruned to discard future data.
"""
# Merge inputs into a single dataframe with relevant info
assert scoredNotes[c.tweetIdKey].min() > 0 # tweet should be set for all notes being scored
peerNotes = scoredNotes[[c.noteIdKey, c.tweetIdKey]].merge(
notes[[c.tweetIdKey, c.noteIdKey, c.classificationKey]].rename(
columns={c.noteIdKey: "peerNoteId"}
)
)
assert len(scoredNotes) == peerNotes[c.noteIdKey].nunique()
assert peerNotes[c.classificationKey].isna().sum() == 0
peerNotes = peerNotes.merge(noteStatusHistory[[c.noteIdKey, c.createdAtMillisKey]])
peerNotes = peerNotes.merge(
noteStatusHistory[
[
c.noteIdKey,
c.createdAtMillisKey,
c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey,
c.timestampMillisOfNoteFirstNonNMRLabelKey,
c.firstNonNMRLabelKey,
]
].rename(columns={c.noteIdKey: "peerNoteId", c.createdAtMillisKey: "peerCreatedAtMillis"})
)
assert len(scoredNotes) == peerNotes[c.noteIdKey].nunique()
# If we are in training, prune scope of what info should be available
assert (_SCORING_CUTOFF_MTS in scoredNotes) == prepareForTraining
if _SCORING_CUTOFF_MTS in scoredNotes:
peerNotes = peerNotes.merge(scoredNotes[[c.noteIdKey, _SCORING_CUTOFF_MTS]])
# Prune any notes created after the cutoff
peerNotes = peerNotes[peerNotes["peerCreatedAtMillis"] <= peerNotes[_SCORING_CUTOFF_MTS]]
# Null any scoring events that happened after the cutoff
peerNotes.loc[
peerNotes[c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey]
> peerNotes[_SCORING_CUTOFF_MTS],
c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey,
] = np.nan
peerNotes.loc[
peerNotes[c.timestampMillisOfNoteFirstNonNMRLabelKey] > peerNotes[_SCORING_CUTOFF_MTS],
[c.timestampMillisOfNoteFirstNonNMRLabelKey, c.firstNonNMRLabelKey],
] = np.nan
# Compute totals
peerNotes = peerNotes[peerNotes[c.noteIdKey] != peerNotes["peerNoteId"]]
totalPeerNotes = (
peerNotes[c.noteIdKey]
.value_counts()
.to_frame()
.reset_index(drop=False)
.rename(columns={"count": _TOTAL_PEER_NOTES})
)
totalPeerMisleadingNotes = (
peerNotes[peerNotes[c.classificationKey] == c.notesSaysTweetIsMisleadingKey][c.noteIdKey]
.value_counts()
.to_frame()
.reset_index(drop=False)
.rename(columns={"count": _TOTAL_PEER_MISLEADING_NOTES})
)
totalPeerNotMisleadingNotes = (
peerNotes[peerNotes[c.classificationKey] == c.noteSaysTweetIsNotMisleadingKey][c.noteIdKey]
.value_counts()
.to_frame()
.reset_index(drop=False)
.rename(columns={"count": _TOTAL_PEER_NON_MISLEADING_NOTES})
)
totalPeerStabilizationNotes = (
peerNotes[
(peerNotes[c.classificationKey] == c.notesSaysTweetIsMisleadingKey)
& (peerNotes[c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey].notna())
][c.noteIdKey]
.value_counts()
.to_frame()
.reset_index(drop=False)
.rename(columns={"count": _TOTAL_PEER_STABILIZATION_NOTES})
)
totalPeerCrhNotes = (
peerNotes[
(peerNotes[c.classificationKey] == c.notesSaysTweetIsMisleadingKey)
& (peerNotes[c.firstNonNMRLabelKey] == c.currentlyRatedHelpful)
][c.noteIdKey]
.value_counts()
.to_frame()
.reset_index(drop=False)
.rename(columns={"count": _TOTAL_PEER_CRH_NOTES})
)
return (
scoredNotes[[c.noteIdKey]]
.merge(totalPeerNotes, how="left")
.merge(totalPeerMisleadingNotes, how="left")
.merge(totalPeerNotMisleadingNotes, how="left")
.merge(totalPeerStabilizationNotes, how="left")
.merge(totalPeerCrhNotes, how="left")
.fillna(0)
.astype(pd.Int64Dtype())
)
def _prepare_note_info(
self,
notes: pd.DataFrame,
ratings: pd.DataFrame,
noteStatusHistory: pd.DataFrame,
prescoringRaterModelOutput: pd.DataFrame,
prepareForTraining: bool,
cutoff: Optional[str],
) -> pd.DataFrame:
"""Generate a DataFrame with one row per note containing all feature information.
Note that some columns contain list values (e.g. a single column contains all
Helpfulness ratings, where each rating is a unique string containing the rater ID
and their Helpfulness rating).
Args:
notes: pd.DataFrame
ratings: pd.DataFrame
noteStatusHistory: pd.DataFrame
prescoringRaterModelOutput: pd.DataFrame
prepareForTraining: True if ratings should be filtered to discard data after the
point of scoring to avoid skew.
cutoff: Whether to prune ratings to those available when a note enters stabilization
or gains CRH status. None if prepareForTraining=False.
Returns:
pd.DataFrame containing all feature information with one row per note.
"""
# Validate and normalize types
assert ((prepareForTraining == False) & (cutoff is None)) | (
prepareForTraining & (cutoff in {_MIN, _MAX})
)
notes[c.tweetIdKey] = notes[c.tweetIdKey].astype(pd.Int64Dtype())
noteStatusHistory[c.createdAtMillisKey] = noteStatusHistory[c.createdAtMillisKey].astype(
pd.Int64Dtype()
)
# Prep notes
scoredNotes = self._get_notes(notes, noteStatusHistory)
if prepareForTraining:
assert cutoff is not None
# Prune to recent notes
scoredNotes = scoredNotes[scoredNotes[c.tweetIdKey] > _MIN_TWEET_ID]
# Validate that stabilization timestamps are valid
assert (
noteStatusHistory[[c.noteIdKey, c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey]].merge(
scoredNotes[[c.noteIdKey]]
)[c.timestampMillisOfFirstNmrDueToMinStableCrhTimeKey]
< 0
).sum() == 0
# Compute flip labels
labels = self._label_notes(noteStatusHistory.merge(scoredNotes[[c.noteIdKey]]))
# Compute scoring cutoffs based on when the note entered and left stabilization
scoringCutoff = self._compute_scoring_cutoff(
noteStatusHistory.merge(scoredNotes[[c.noteIdKey]]),
ratings.merge(scoredNotes[[c.noteIdKey]]),
cutoff,
)
# Validate and merge data, effectively pruning to notes that have a label
scoredNotes = scoredNotes.merge(scoringCutoff, on=c.noteIdKey)
assert len(scoredNotes) == len(scoringCutoff)
assert len(labels) == len(
scoringCutoff.merge(labels)
) # labels should be a subset of scoringCutoff
scoredNotes = scoredNotes.merge(labels)
assert len(scoredNotes) == len(labels)
totalScoredNotes = len(scoredNotes)
# Prep ratings
# Prune ratings to only include scored notes and other notes on the same post
assert scoredNotes[c.tweetIdKey].min() > 0 # tweet should be set for all notes being scored
adjacentNotes = notes[[c.noteIdKey, c.tweetIdKey]].merge(
scoredNotes[[c.tweetIdKey]].drop_duplicates()
)[[c.noteIdKey]]
assert len(adjacentNotes) == adjacentNotes[c.noteIdKey].nunique()
ratings = ratings.merge(adjacentNotes)
assert len(ratings) == len(ratings[[c.noteIdKey, c.raterParticipantIdKey]].drop_duplicates())
# Associate rater factors
raterFactors = self._compute_rater_factors(prescoringRaterModelOutput)
assert len(raterFactors) == raterFactors[c.raterParticipantIdKey].nunique()
ratings = ratings.merge(raterFactors, how="left")
# Generate rating datasets for self, peer misleading and peer non-misleading notes
localRatings = self._prepare_local_ratings(ratings, scoredNotes[[c.noteIdKey]])
peerRatings = self._prepare_peer_ratings(
ratings,
notes[[c.noteIdKey, c.tweetIdKey, c.classificationKey]],
scoredNotes[[c.noteIdKey, c.tweetIdKey]],
)
peerMisleadingRatings = peerRatings[
peerRatings[c.classificationKey] == c.notesSaysTweetIsMisleadingKey
]
peerNonMisleadingRatings = peerRatings[
peerRatings[c.classificationKey] == c.noteSaysTweetIsNotMisleadingKey
]
if prepareForTraining:
localRatings = self._apply_cutoff(localRatings, scoredNotes)
peerMisleadingRatings = self._apply_cutoff(peerMisleadingRatings, scoredNotes)
peerNonMisleadingRatings = self._apply_cutoff(peerNonMisleadingRatings, scoredNotes)
# Extract featuers
# Generate features that depend on self ratings only
writingLatency = self._get_note_writing_latency(
scoredNotes[[c.noteIdKey, _TWEET_CREATION_MILLIS, _NOTE_CREATION_MILLIS]]
)
scoredNotes = scoredNotes.merge(writingLatency, how="inner")
noteAuthors = noteStatusHistory[[c.noteIdKey, c.noteAuthorParticipantIdKey]]
scoredNotes = scoredNotes.merge(noteAuthors, how="inner")
quickRatings = self._get_quick_rating_stats(
scoredNotes[[c.noteIdKey, _NOTE_CREATION_MILLIS]], localRatings
)
scoredNotes = scoredNotes.merge(quickRatings, how="inner")
burstRatings = self._get_burst_rating_stats(
scoredNotes[[c.noteIdKey]], localRatings[[c.noteIdKey, c.createdAtMillisKey]]
)
scoredNotes = scoredNotes.merge(burstRatings, how="inner")
recentRatings = self._get_recent_rating_stats(
scoredNotes, localRatings[[c.noteIdKey, c.createdAtMillisKey]], prepareForTraining
)
scoredNotes = scoredNotes.merge(recentRatings, how="inner")
peerNoteCounts = self._get_note_counts(
scoredNotes, notes, noteStatusHistory, prepareForTraining
)
scoredNotes = scoredNotes.merge(peerNoteCounts, how="inner")
# Generate features based on self and peer ratings
for df, prefix in [
(localRatings, _LOCAL),
(peerMisleadingRatings, _PEER_MISLEADING),
(peerNonMisleadingRatings, _PEER_NON_MISLEADING),
]:
features = self._make_note_info_from_ratings(scoredNotes, df)
overlapCols = (set(scoredNotes.columns) & set(features.columns)) - {c.noteIdKey}
features = features[[col for col in features.columns if col not in overlapCols]]
features = features.rename(
columns={col: f"{prefix}_{col}" for col in features if col != c.noteIdKey}
)
scoredNotes = scoredNotes.merge(features, how="left")
# Merge rating totals for debugging / info
scoredNotes = scoredNotes.merge(
localRatings[[c.noteIdKey]]
.value_counts()
.to_frame()
.reset_index(drop=False)
.rename(columns={"count": _LOCAL}),
how="left",
)
scoredNotes = scoredNotes.merge(
peerMisleadingRatings[[c.noteIdKey]]
.value_counts()
.to_frame()
.reset_index(drop=False)
.rename(columns={"count": _PEER_MISLEADING}),
how="left",
)
scoredNotes = scoredNotes.merge(
peerNonMisleadingRatings[[c.noteIdKey]]
.value_counts()
.to_frame()
.reset_index(drop=False)
.rename(columns={"count": _PEER_NON_MISLEADING}),
how="left",
)
scoredNotes = scoredNotes.fillna(
{_LOCAL: 0, _PEER_MISLEADING: 0, _PEER_NON_MISLEADING: 0}
).astype(
{
_LOCAL: pd.Int64Dtype(),
_PEER_MISLEADING: pd.Int64Dtype(),
_PEER_NON_MISLEADING: pd.Int64Dtype(),
}
)
assert len(scoredNotes) == totalScoredNotes, f"{len(scoredNotes)} vs {totalScoredNotes}"
return scoredNotes
def _get_feature_pipeline(self, noteInfo: pd.DataFrame) -> Pipeline:
# Begin with author pipeline
columnPipes: List[Tuple[str, Any, Union[str, List[str]]]] = [
(
c.noteAuthorParticipantIdKey,
Pipeline([("onehot", OneHotEncoder(handle_unknown="ignore"))]),
[c.noteAuthorParticipantIdKey],
)
]
# Add pipelines for individual user helpfulness ratings
for prefix in [_LOCAL, _PEER_MISLEADING, _PEER_NON_MISLEADING]:
column = f"{prefix}_{_USER_HELPFULNESS_RATINGS}"
pipe = Pipeline(
[
("_set_to_list", FunctionTransformer(_set_to_list)),
(
"onehot",
CountVectorizer(
tokenizer=_identity, preprocessor=_identity, min_df=self._helpfulnessRaterMin
),
),
]
)
columnPipes.append((column, pipe, column))
# Add pipelines for individual helpful tag ratings
for prefix in [_LOCAL, _PEER_MISLEADING, _PEER_NON_MISLEADING]:
column = f"{prefix}_{_USER_HELPFUL_TAGS}"
pipe = Pipeline(
[
("_set_to_list", FunctionTransformer(_set_to_list)),
(
"onehot",
CountVectorizer(tokenizer=_identity, preprocessor=_identity, min_df=self._tagRaterMin),
),
("selection", SelectPercentile(chi2, percentile=self._helpfulTagPercentile)),
]
)
columnPipes.append((column, pipe, column))
# Add pipelines for individual not-helpful tag ratings
for prefix in [_LOCAL, _PEER_MISLEADING, _PEER_NON_MISLEADING]:
column = f"{prefix}_{_USER_NOT_HELPFUL_TAGS}"
pipe = Pipeline(
[
("_set_to_list", FunctionTransformer(_set_to_list)),
(
"onehot",
CountVectorizer(tokenizer=_identity, preprocessor=_identity, min_df=self._tagRaterMin),
),
("selection", SelectPercentile(chi2, percentile=self._notHelpfulTagPercentile)),
]
)
columnPipes.append((column, pipe, column))
# Add pipelines for tag ratio columns
for prefix in [_LOCAL, _PEER_MISLEADING, _PEER_NON_MISLEADING]:
for tagset in [c.notHelpfulTagsTSVOrder, c.helpfulTagsTSVOrder]:
for tag in tagset:
column = f"{prefix}_{tag}"
self._column_thresholds[column] = noteInfo[column].quantile(0.99)
if noteInfo[column].min() == noteInfo[column].max():
continue
pipe = Pipeline(
[
("reshape", FunctionTransformer(_reshape)),
("drop_constants", VarianceThreshold()),
(
"binize",
KBinsDiscretizer(n_bins=self._tag_ratio_bins, encode="onehot", strategy="kmeans"),
),
]
)
columnPipes.append((column, pipe, column))
# Add pipelines for rating counts across notes
columns = []
for prefix in [_LOCAL, _PEER_MISLEADING, _PEER_NON_MISLEADING]:
for col in _BUCKET_COUNT_COLS:
columns.append(f"{prefix}_{col}")
assert noteInfo[columns].isna().sum().sum() == 0
for col in columns:
pipe = Pipeline(
[
("log", FunctionTransformer(_feature_log)),
(
"binize",
KBinsDiscretizer(n_bins=self._rating_count_bins, encode="onehot", strategy="kmeans"),
),
]
)
columnPipes.append((col, pipe, [col]))
for degree in [2, 3]:
pipe = Pipeline(
[
("log", FunctionTransformer(_feature_log)),
(
"binize",
KBinsDiscretizer(n_bins=self._rating_count_bins, encode="onehot", strategy="kmeans"),
),
("drop_rare", VarianceThreshold(threshold=0.001)),
(
"cross",
PolynomialFeatures(degree=(degree, degree), interaction_only=True, include_bias=False),
),
("drop_rare_again", VarianceThreshold(threshold=0.001)),
]
)
columnPipes.append((f"cross_note_counts_degree_{degree}", pipe, columns))
# Add pipelines for rating counts within notes
for prefix in [_LOCAL, _PEER_MISLEADING, _PEER_NON_MISLEADING]:
columns = []
for col in _BUCKET_COUNT_COLS:
columns.append(f"{prefix}_{col}")
assert noteInfo[columns].isna().sum().sum() == 0
pipe = Pipeline(
[
("log", FunctionTransformer(_feature_log)),
(
"binize",
KBinsDiscretizer(n_bins=self._rating_count_bins, encode="onehot", strategy="kmeans"),
),
("drop_rare", VarianceThreshold(threshold=0.001)),
("cross_0", PolynomialFeatures(degree=(2, 2), interaction_only=True, include_bias=False)),
("cross_1", PolynomialFeatures(degree=(2, 2), interaction_only=True, include_bias=False)),
("drop_rare_again", VarianceThreshold(threshold=0.001)),
]
)
columnPipes.append((f"{prefix}_cross_note_counts_degree_4", pipe, columns))
# Add pipelines for rater factor stats
for prefix in [_LOCAL, _PEER_MISLEADING, _PEER_NON_MISLEADING]:
columns = []
for col in _STATS_COLS:
columns.append(f"{prefix}_{col}")
pipe = Pipeline(
[
("fill_nans_df", FunctionTransformer(_fill_na)),
(
"binize",
KBinsDiscretizer(n_bins=self._factor_stats_bins, encode="onehot", strategy="kmeans"),
),
("cross", PolynomialFeatures(degree=(1, 2), interaction_only=True, include_bias=False)),
("drop_rare", VarianceThreshold(threshold=0.001)),
]
)
columnPipes.append((f"{prefix}_stat_cols", pipe, columns))
# Add pipelines for rating bursts
for colset in [_QUICK_RATING_COLS, _BURST_RATING_COLS, _RECENT_RATING_COLS]:
for col in colset:
assert noteInfo[col].isna().sum() == 0
if "RATIO" in col:
self._column_thresholds[col] = noteInfo[col].quantile(0.999)
pipe = Pipeline(
[
(
"binize",
KBinsDiscretizer(n_bins=self._burst_bins, encode="onehot", strategy="kmeans"),
),
]
)
else:
assert "TOTAL" in col
self._column_thresholds[col] = noteInfo[col].quantile(0.999)
pipe = Pipeline(
[
(
"binize",
KBinsDiscretizer(n_bins=self._burst_bins, encode="onehot", strategy="kmeans"),
),
]
)
columnPipes.append((col, pipe, [col]))
# Add pipeline for note writing latency
assert noteInfo[_NOTE_WRITING_LATENCY].isna().sum() == 0
self._column_thresholds[_NOTE_WRITING_LATENCY] = noteInfo[_NOTE_WRITING_LATENCY].quantile(0.999)
pipe = Pipeline(
[
("binize", KBinsDiscretizer(n_bins=self._latency_bins, encode="onehot", strategy="kmeans")),
]
)
columnPipes.append((_NOTE_WRITING_LATENCY, pipe, [_NOTE_WRITING_LATENCY]))
# Add columns for peer notes
peerNoteCols = [
_TOTAL_PEER_NOTES,
_TOTAL_PEER_MISLEADING_NOTES,
_TOTAL_PEER_NON_MISLEADING_NOTES,
_TOTAL_PEER_CRH_NOTES,
_TOTAL_PEER_STABILIZATION_NOTES,
]
assert noteInfo[peerNoteCols].isna().sum().sum() == 0
for col in peerNoteCols:
self._column_thresholds[col] = noteInfo[col].quantile(0.9999)
pipe = Pipeline(
[
(
"binize",
KBinsDiscretizer(n_bins=self._peer_note_count_bins, encode="onehot", strategy="kmeans"),
),
]
)
columnPipes.append((col, pipe, [col]))
pipe = Pipeline(
[
("log", FunctionTransformer(_feature_log)),
(
"binize",
KBinsDiscretizer(n_bins=self._peer_note_count_bins, encode="onehot", strategy="kmeans"),
),
("cross", PolynomialFeatures(degree=(2, 2), interaction_only=True, include_bias=False)),
("drop_rare", VarianceThreshold(threshold=0.001)),
]
)
columnPipes.append(("peer_note_cross_degree_2", pipe, peerNoteCols))
# Build and return column transformer
return ColumnTransformer(columnPipes, verbose=True)
def _get_model_pipeline(self, noteInfo: pd.DataFrame) -> Pipeline:
"""Return a full model pipeline including feature extraction and model.
Note that the pipeline includes a VarianceThreshold filter between feature extraction
and model training to drop any features that have zero variance (i.e. always have the
same value).
Returns:
Pipeline containing feature extraction, variance threshold and model.
"""
return Pipeline(
[
("feature_extraction", self._get_feature_pipeline(noteInfo)),
("drop_constants", VarianceThreshold()),
(
"lr",
LogisticRegression(
penalty=self._penalty,
C=self._C,
max_iter=self._maxIter,
verbose=self._verbose,
class_weight=self._classWeight,
solver=self._solver,
tol=self._tol,
),
),
]
)
def _transform_note_info(self, noteInfo: pd.DataFrame) -> pd.DataFrame:
noteInfo = noteInfo.copy()
# Transform tag ratio columns
for prefix in [_LOCAL, _PEER_MISLEADING, _PEER_NON_MISLEADING]:
for tagset in [c.notHelpfulTagsTSVOrder, c.helpfulTagsTSVOrder]:
for tag in tagset:
column = f"{prefix}_{tag}"
threshold = self._column_thresholds[column]
noteInfo[column] = noteInfo[column].fillna(-0.25).clip(-0.25, threshold)
# Transform for rating bursts
for colset in [_QUICK_RATING_COLS, _BURST_RATING_COLS, _RECENT_RATING_COLS]:
for column in colset:
threshold = self._column_thresholds[column]
if "RATIO" in column:
noteInfo[column] = noteInfo[column].clip(0, threshold)
else:
assert "TOTAL" in column
noteInfo[column] = np.log(1 + noteInfo[column].clip(0, threshold)) / np.log(2)
# Transform for writing latency
threshold = self._column_thresholds[_NOTE_WRITING_LATENCY]
noteInfo[_NOTE_WRITING_LATENCY] = np.log(
noteInfo[_NOTE_WRITING_LATENCY].clip(0, threshold)
) / np.log(2)
# Transform for peer notes
peerNoteCols = [
_TOTAL_PEER_NOTES,
_TOTAL_PEER_MISLEADING_NOTES,
_TOTAL_PEER_NON_MISLEADING_NOTES,
_TOTAL_PEER_CRH_NOTES,
_TOTAL_PEER_STABILIZATION_NOTES,
]
for column in peerNoteCols:
threshold = self._column_thresholds[column]
noteInfo[column] = np.log(1 + noteInfo[column].clip(0, threshold)) / np.log(2)
return noteInfo
def _get_label_vector(self, noteInfo: pd.DataFrame) -> np.array:
"""Extract a binary label vector from a noteInfo DataFrame."""
return (noteInfo[LABEL] == FLIP).values.astype(np.int8)
def _evaluate_model(
self, noteInfo: pd.DataFrame, threshold: Optional[float] = None
) -> Tuple[float, float, float, float]:
"""Apply a pipeline to noteInfo and return the AUC, TPR, FPR and associated threshold.
Assumes that the pipeline has already been fit. If the threshold is specified as a
command line argument then uses the provided threshold. Otherwise, select the
threshold to yield a 25% FPR.
Args:
noteInfo: pd.DataFrame containing raw feature information and labels.
Returns:
Tuple containing AUC, TPR and FPR.
"""
assert self._pipeline is not None, "pipeline must be fit prior to evaluation"
labels = self._get_label_vector(noteInfo)
predictions = self._pipeline.decision_function(noteInfo)
fpRates, tpRates, thresholds = roc_curve(labels, predictions)
auc = area_under_curve(fpRates, tpRates)
if threshold is None:
idx = np.argmin(np.abs(fpRates - self._crhFpRate))
threshold = thresholds[idx]
fpr = fpRates[idx]
tpr = tpRates[idx]
else:
tn, fp, fn, tp = confusion_matrix(
labels, (predictions > threshold).astype(np.int8), labels=np.arange(2)
).ravel()
fpr = fp / (fp + tn)
tpr = tp / (tp + fn)
logger.info(f"threshold={threshold} tpr={tpr} fpr={fpr} auc={auc}")
return (threshold, tpr, fpr, auc)
def _convert_col_types(self, noteInfo: pd.DataFrame) -> pd.DataFrame:
"""Convert pandas types to numpy types.
This conversion is necessary for scikit-learn compatibility.
"""
for col, dtype in noteInfo.dtypes.to_dict().items():
if isinstance(dtype, pd.Int64Dtype):
assert noteInfo[col].isna().sum() == 0
noteInfo[col] = noteInfo[col].astype(np.int64)
if isinstance(dtype, pd.Float64Dtype):
noteInfo[col] = noteInfo[col].astype(np.float64)
return noteInfo
def _profile_pipeline(self, pipe: Pipeline, noteInfo: pd.DataFrame) -> str:
"""Generate a numerical profile of each extracted feature.
For each feature, we examine the dimensionality and sparsity of the feature. For low
dimensional features representing discretized continuous values, we also profile the
size and boundaries of each bin.
"""
# Generate feature matrix
matrix = pipe.transform(noteInfo)
# Profile matrix
start = 0
lines = []
for name, transformer, _ in pipe.transformers_:
if name == "remainder":
continue
end = start + len(transformer[-1].get_feature_names_out())
total = int(matrix[:, start:end].sum())
colMin = int(matrix[:, start:end].sum(axis=0).min())
colMean = total / (end - start)
colMax = int(matrix[:, start:end].sum(axis=0).max())
rowMin = int(matrix[:, start:end].sum(axis=1).min())
rowMean = total / (matrix.shape[0])
rowMax = int(matrix[:, start:end].sum(axis=1).max())
columns = [
f"{name:<60}pos=[{start:8} {end:8} {end-start:8}]",
f"total={total:9}",
f"col=[{colMin:8} {colMean:8.1f} {colMax:8}]",
f"row=[{rowMin:8} {rowMean:8.1f} {rowMax:8}]",
]
if (end - start) <= 10:
columns.append(f"{str(matrix[:, start:end].sum(axis=0).astype(np.int64)):<80}")
columns.append(str(transformer[-1].bin_edges_[0].round(3).tolist()))
lines.append(" ".join(columns))
start = end
return "\n".join(lines)
def fit(
self,
notes: pd.DataFrame,
ratings: pd.DataFrame,
noteStatusHistory: pd.DataFrame,
prescoringRaterModelOutput: pd.DataFrame,
) -> None:
"""Fit and evaluate a sklearn pipeline for predicting note status.
Args:
notes: pd.DataFrame
ratings: pd.DataFrame
noteStatusHistory: pd.DataFrame
prescoringRaterModelOutput: pd.DataFrame
Returns:
sklearn pipeline covering containing full process of feature extraction, feature
selection and prediction.
"""
# Apply seed if necessary
if self._seed is not None:
logger.info(f"seeding pflip: {self._seed}")
np.random.seed(self._seed)
# Prepare datasets
noteInfo = pd.concat(
[
self._prepare_note_info(
notes,
ratings,
noteStatusHistory,
prescoringRaterModelOutput,
prepareForTraining=True,
cutoff=_MIN,
),
self._prepare_note_info(
notes,
ratings,
noteStatusHistory,
prescoringRaterModelOutput,
prepareForTraining=True,
cutoff=_MAX,
),
]
)
noteInfo = self._convert_col_types(noteInfo)
noteInfo = noteInfo.sort_values(c.noteIdKey)
if len(noteInfo) == 0:
return
logger.info(f"noteInfo summary: {get_df_fingerprint(noteInfo, [c.noteIdKey])}")
# Dividing training data temporally provides a more accurate measurement, but would also
# require excluding the newest data from training.
trainDataFrame, validationDataFrame = train_test_split(noteInfo, train_size=self._trainSize)
logger.info(f"pflip training data size: {len(trainDataFrame)}")
logger.info(f"trainDataFrame summary: {get_df_fingerprint(trainDataFrame, [c.noteIdKey])}")
logger.info(f"pflip validation data size: {len(validationDataFrame)}")
logger.info(
f"validationDataFrame summary: {get_df_fingerprint(validationDataFrame, [c.noteIdKey])}"
)
# Fit model
self._pipeline = self._get_model_pipeline(trainDataFrame)
trainDataFrame = self._transform_note_info(trainDataFrame)
validationDataFrame = self._transform_note_info(validationDataFrame)
self._pipeline.fit(trainDataFrame, self._get_label_vector(trainDataFrame))
featureProfile = self._profile_pipeline(self._pipeline[0], trainDataFrame)
logger.info(f"\ntraining feature matrix profile:\n{featureProfile}")
# Evaluate model
logger.info("Training Results:")
threshold, _, _, _ = self._evaluate_model(trainDataFrame)
self._predictionThreshold = threshold
logger.info("Validation Results:")
self._evaluate_model(validationDataFrame, threshold=threshold)
def serialize(self) -> bytes:
"""Return a serialized version of the PFlipModel object.
Note that since the pflip pipeline includes CountVectorizer instances that have
functions as parameters, joblib must be called from within this same module to be
able to serialize the functions.
Returns:
bytes containing a serialized PFlipModel object
"""
buffer = BytesIO()
joblib.dump(self, buffer)
return buffer.getvalue()
def predict(
self,
notes: pd.DataFrame,
ratings: pd.DataFrame,
noteStatusHistory: pd.DataFrame,
prescoringRaterModelOutput: pd.DataFrame,
maxBatchSize: int = 10000,
) -> pd.DataFrame:
"""Given input DataFrames, predict which notes will flip and lose CRH status.
Args:
notes: pd.DataFrame
ratings: pd.DataFrame
noteStatusHistory: pd.DataFrame
prescoringRaterModelOutput: pd.DataFrame
Returns:
pd.DataFrame containing noteIds and predicted labels
"""
assert self._pipeline is not None, "pipeline must be initialized prior to prediction"
assert (
self._predictionThreshold is not None
), "threshold must be initialized prior to prediction"
# Build list of unique tweetIds
tweetIds = notes[[c.tweetIdKey]].drop_duplicates()
tweetIds = tweetIds[tweetIds[c.tweetIdKey] != "-1"]
# Iterate through batches
results = [pd.DataFrame({c.noteIdKey: [], LABEL: []})]
start = 0
while start < len(tweetIds):
logger.info(f"processing prediction batch: {start}")
# Note that all peer tweets must be included in the same batch for correct feature extraction
tweetBatch = tweetIds.iloc[start : (start + maxBatchSize)]
noteBatch = (
notes[[c.noteIdKey, c.tweetIdKey]].merge(tweetBatch)[[c.noteIdKey]].drop_duplicates()
)
noteInfo = self._prepare_note_info(
notes.merge(noteBatch),
ratings.merge(noteBatch),
noteStatusHistory.merge(noteBatch),
prescoringRaterModelOutput,
prepareForTraining=False,
cutoff=None,
)
noteInfo = self._convert_col_types(noteInfo)
noteInfo = self._transform_note_info(noteInfo)
predictions = self._pipeline.decision_function(noteInfo)
results.append(
pd.DataFrame(
{
c.noteIdKey: noteInfo[c.noteIdKey],
LABEL: [FLIP if p > self._predictionThreshold else CRH for p in predictions],
}
)
)
start += maxBatchSize
return pd.concat(results)