data_validation.py (333 lines of code) (raw):
import re
from collections import namedtuple
from datetime import date, timedelta, datetime
import pandas as pd
from google.cloud import bigquery
project = "moz-fx-mfouterbounds-prod-f98d"
def calculate_data_validation_metrics(metadata_source, languages_source):
"""
Calculate metrics for determining whether our search volume is changing in ways that might invalidate our current sanitization model.
Arguments:
- metadata_source: a string. The name of the table containing the metadata to be fetched.
- languages_source: a string. The name of the table containing language distributions for search term jobs.
Returns: A dataframe of the data validation metrics for the sanitization jobs.
"""
if re.fullmatch(r"[A-Za-z0-9\.\-\_]+", metadata_source):
metadata_source_no_injection = metadata_source
else:
raise Exception(
"metadata_source in incorrect format. This should be a fully qualified table name like myproject.mydataset.my_table"
)
if re.fullmatch(r"[A-Za-z0-9\.\-\_]+", languages_source):
languages_source_no_injection = languages_source
else:
raise Exception(
"metadata_source in incorrect format. This should be a fully qualified table name like myproject.mydataset.my_table"
)
# We are using f-strings here because BQ does not allow table names to be parametrized
# and we need to be able to run the same script in the staging and prod db environments for reliable testing outcomes.
SUCCESSFUL_SANITIZATION_JOB_RUN_METADATA = f"""
SELECT
finished_at,
SAFE_DIVIDE(total_search_terms_removed_by_sanitization_job, total_search_terms_analyzed) AS pct_sanitized_search_terms,
SAFE_DIVIDE(contained_at, total_search_terms_analyzed) AS pct_sanitized_contained_at,
SAFE_DIVIDE(contained_numbers, total_search_terms_analyzed) AS pct_sanitized_contained_numbers,
SAFE_DIVIDE(contained_name, total_search_terms_analyzed) AS pct_sanitized_contained_name,
SAFE_DIVIDE(sum_terms_containing_us_census_surname, total_search_terms_analyzed) AS pct_terms_containing_us_census_surname,
SAFE_DIVIDE(sum_uppercase_chars_all_search_terms, sum_chars_all_search_terms) AS pct_uppercase_chars_all_search_terms,
SAFE_DIVIDE(sum_words_all_search_terms, total_search_terms_analyzed) AS avg_words_all_search_terms,
1 - SAFE_DIVIDE(languages.english_count, languages.all_languages_count) AS pct_terms_non_english
FROM `{metadata_source_no_injection}` AS metadata
JOIN
(
SELECT
max(case when language_code = 'en' then search_term_count end) english_count,
sum(search_term_count) as all_languages_count,
FROM `{languages_source_no_injection}`
GROUP BY job_start_time
) AS languages
ON metadata.started_at = languages.job_start_time
WHERE status = 'SUCCESS'
ORDER BY finished_at ASC;
"""
client = bigquery.Client(project=project)
query_job = client.query(SUCCESSFUL_SANITIZATION_JOB_RUN_METADATA)
results_as_dataframe = query_job.result().to_dataframe()
return results_as_dataframe
def export_data_validation_metrics_to_bigquery(dataframe, destination_table_id):
"""
Append data validation metrics to the BigQuery table tracking these metrics from job metadata.
Arguments:
- dataframe: A dataframe of validation metrics to be added.
- destination_table_id: the fully qualified name of the table for the data to be exported into.
Returns: Nothing.
It does print a result value as a cursory logging mechanism. That result object can be parsed and logged to wherever we like.
"""
client = bigquery.Client(project=project)
schema = [
bigquery.SchemaField("finished_at", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField(
"pct_sanitized_search_terms", bigquery.enums.SqlTypeNames.FLOAT64
),
bigquery.SchemaField(
"pct_sanitized_contained_at", bigquery.enums.SqlTypeNames.FLOAT64
),
bigquery.SchemaField(
"pct_sanitized_contained_numbers", bigquery.enums.SqlTypeNames.FLOAT64
),
bigquery.SchemaField(
"pct_sanitized_contained_name", bigquery.enums.SqlTypeNames.FLOAT64
),
bigquery.SchemaField(
"pct_terms_containing_us_census_surname",
bigquery.enums.SqlTypeNames.FLOAT64,
),
bigquery.SchemaField(
"pct_uppercase_chars_all_search_terms", bigquery.enums.SqlTypeNames.FLOAT64
),
bigquery.SchemaField(
"avg_words_all_search_terms", bigquery.enums.SqlTypeNames.FLOAT64
),
bigquery.SchemaField(
"pct_terms_non_english", bigquery.enums.SqlTypeNames.FLOAT64
),
]
destination_table = bigquery.Table(destination_table_id)
job = client.insert_rows_from_dataframe(
table=destination_table, dataframe=dataframe, selected_fields=schema
)
print(job)
def retrieve_data_validation_metrics(metrics_source):
"""
Pull all the sanitization job data validation metrics.
Arguments:
- metadata_source: a string. The name of the table containing the data validation metrics to be fetched.
Returns: A dataframe of the data validation metrics.
"""
if re.fullmatch(r"[A-Za-z0-9\.\-\_]+", metrics_source):
metrics_source_no_injection = metrics_source
else:
raise Exception(
"metadata_source in incorrect format. This should be a fully qualified table name like myproject.mydataset.my_table"
)
# We are using f-strings here because BQ does not allow table names to be parametrized
# and we need to be able to run the same script in the staging and prod db environments for reliable testing outcomes.
DATA_VALIDATION_METRICS_QUERY = f"""
SELECT
*
FROM `{metrics_source_no_injection}` AS metadata
ORDER BY finished_at ASC;
"""
client = bigquery.Client(project=project)
query_job = client.query(DATA_VALIDATION_METRICS_QUERY)
results_as_dataframe = query_job.result().to_dataframe()
return results_as_dataframe
def range_check(
validation_data: pd.DataFrame,
metric: str,
full_lookback_window: int,
test_window: int,
range_lower_bound: float,
range_upper_bound: float,
):
print(f"Performing range check for metric: {metric}")
"""
Determines if all the values in a test window of days fall inside some percentile of the normal range for a set of comparison values in a comparison window of days.
Inputs:
- validation_data: the dataframe with the data in it to be checked.
ASSUMES the presence of a 'finished_at' column, whose date is used to calculate lookback and test windows.
- metric: the name of the column in the input dataframe on which to perform the check.
- full_lookback_window: an integer number of days that the comparison set should cover.
- test_window. an integer number of days that the test set should cover.
ASSUMES that the test window immediately succeeds the full_lookback_window.
- range_lower_bound: a float between 0 and 1 indicating the lower edge of the window of normal values from the comparison set
inside which at least one of the values in the test set should fall.
- range_upper_bound: a float between 0 and 1 indicating the upper edge of the window of normal values from the comparison set
inside which at least one of the values in the test set should fall.
Outputs:
- finished_at: the finished_at timestamp of the job run to which this check applies.
- num_values_compared: an integer representing the total number of range values included in this comparison.
- should_trigger: a bool indicating whether the values in the test window are all falling OUTSIDE the expected range.
- range_lower: a float. The lower bound of the expected range calculated from comparison values.
- range_upper: a float. The upper bound of the expected range calculated from comparison values.
- test_range: a list. The entirety of the test values.
"""
if not (0 < range_lower_bound < 1 and 0 < range_upper_bound < 1):
raise Exception(
"range_lower_bound and range_upper_bound should both be between zero (0) and one (1)."
)
if "finished_at" not in validation_data.columns.values:
raise Exception("dataframe must include a finished_at column.")
if metric not in validation_data.columns.values:
raise Exception(f'dataframe does not include target metric "{metric}"')
today = date.today()
latest_finished_at = max(validation_data["finished_at"])
test_earliest_date = today - timedelta(days=test_window)
comparison_earliest_date = test_earliest_date - timedelta(days=full_lookback_window)
comparison_values = validation_data["finished_at"].apply(
lambda m: comparison_earliest_date < m.date() <= test_earliest_date
)
test_values = validation_data["finished_at"].apply(
lambda m: test_earliest_date < m.date() <= today
)
comparison_range = validation_data.loc[comparison_values]
test_range = validation_data.loc[test_values]
range_lower, range_upper = comparison_range[metric].quantile(
q=[range_lower_bound, range_upper_bound]
)
should_trigger = len(test_range[metric]) != 0 and (
all(test_range[metric] > range_upper) or all(test_range[metric] < range_lower)
)
print(f"Completed range check for metric: {metric}")
return (
latest_finished_at,
len(comparison_range),
should_trigger,
range_lower,
range_upper,
list(test_range[metric]),
)
def mean_check(
validation_data: pd.DataFrame,
metric: str,
full_lookback_window: int,
test_window: int,
moving_average_window: int,
mean_lower_bound: float,
mean_upper_bound: float,
):
print(f"Performing mean check for metric: {metric}")
"""
Determines if all the moving averages in a test window of days fall inside some percentile of the moving average for a set of comparison values in a comparison window of days.
Inputs:
- validation_data: the dataframe with the data in it to be checked.
ASSUMES the presence of a 'finished_at' column, whose date is used to calculate lookback and test windows.
- metric: the name of the column in the input dataframe on which to perform the check.
- full_lookback_window: an integer number of days that the comparison set should cover.
- test_window. an integer number of days that the test set should cover.
ASSUMES that the test window immediately succeeds the full_lookback_window.
- moving_average_window: an integer. Number of prior days over which to calculate an average for a given day.
- mean lower bound: a float between 0 and 1 indicating the lower edge of the window of normal values from the comparison set
inside which at least one of the values in the test set should fall.
- mean upper bound: a float between 0 and 1 indicating the upper edge of the window of normal values from the comparison set
inside which at least one of the values in the test set should fall.
Outputs:
- finished_at: the finished_at timestamp of the job run to which this check applies.
- num_moving_averages_compared: an integer representing the total number of moving average values included in this comparison.
- should_trigger: a bool indicating whether the values in the test window are all falling OUTSIDE the expected range.
- mean_lower: a float. The lower bound of the expected range of moving averages calculated from comparison values.
- mean_upper: a float. The upper bound of the expected range of moving averages calculated from comparison values.
- moving_average_windo: an integer. The moving average window passed into the function.
- test_moving_averages: a list. The entirety of the test values.
"""
if not (0 < mean_lower_bound < 1 and 0 < mean_upper_bound < 1):
raise Exception(
"mean_lower_bound and mean_upper_bound should both be between zero (0) and one (1)."
)
if "finished_at" not in validation_data.columns.values:
raise Exception("dataframe must include a finished_at column.")
if metric not in validation_data.columns.values:
raise Exception(f'dataframe does not include target metric "{metric}"')
today = date.today()
latest_finished_at = max(validation_data["finished_at"])
test_earliest_date = today - timedelta(days=test_window)
comparison_earliest_date = test_earliest_date - timedelta(days=full_lookback_window)
x_day_moving_average = f"{moving_average_window}_day_{metric}_moving_avg"
validation_data[x_day_moving_average] = (
validation_data[metric]
.rolling(window=moving_average_window, min_periods=0)
.mean()
)
comparison_values = validation_data["finished_at"].apply(
lambda m: comparison_earliest_date < m.date() <= test_earliest_date
)
test_values = validation_data["finished_at"].apply(
lambda m: test_earliest_date < m.date() <= today
)
comparison_range = validation_data.loc[comparison_values]
test_range = validation_data.loc[test_values]
mean_lower, mean_upper = comparison_range[x_day_moving_average].quantile(
q=[mean_lower_bound, mean_upper_bound]
)
test_moving_averages = test_range[x_day_moving_average]
should_trigger = len(test_moving_averages) != 0 and (
all(test_moving_averages > mean_upper) or all(test_moving_averages < mean_lower)
)
num_moving_averages_compared = int(
comparison_range[x_day_moving_average].notna().sum()
)
print(f"Completed mean check for metric: {metric}")
return (
latest_finished_at,
num_moving_averages_compared,
should_trigger,
mean_lower,
mean_upper,
moving_average_window,
list(test_moving_averages),
)
def record_validation_results(val_df, destination_table):
print(f"Recording validation results to destination table: {destination_table}")
InputSet = namedtuple(
"InputSet",
"name full_lookback_window range_test_window range_lower_bound range_upper_bound mean_test_window mean_lower_bound mean_upper_bound moving_average_window",
)
client = bigquery.Client(project=project)
started_at = datetime.utcnow()
for metric in [
InputSet(
name="pct_sanitized_search_terms",
full_lookback_window=90,
range_test_window=4,
range_lower_bound=0.125,
range_upper_bound=0.875,
mean_test_window=8,
mean_lower_bound=0.01,
mean_upper_bound=0.99,
moving_average_window=7,
),
InputSet(
name="pct_sanitized_contained_at",
full_lookback_window=90,
range_test_window=4,
range_lower_bound=0.125,
range_upper_bound=0.875,
mean_test_window=8,
mean_lower_bound=0.025,
mean_upper_bound=0.975,
moving_average_window=7,
),
InputSet(
name="pct_sanitized_contained_numbers",
full_lookback_window=90,
range_test_window=3,
range_lower_bound=0.075,
range_upper_bound=0.925,
mean_test_window=8,
mean_lower_bound=0.01,
mean_upper_bound=0.99,
moving_average_window=7,
),
InputSet(
name="pct_sanitized_contained_name",
full_lookback_window=90,
range_test_window=5,
range_lower_bound=0.025,
range_upper_bound=0.975,
mean_test_window=7,
mean_lower_bound=0.01,
mean_upper_bound=0.99,
moving_average_window=7,
),
InputSet(
name="pct_terms_containing_us_census_surname",
full_lookback_window=90,
range_test_window=3,
range_lower_bound=0.1,
range_upper_bound=0.9,
mean_test_window=8,
mean_lower_bound=0.01,
mean_upper_bound=0.99,
moving_average_window=9,
),
InputSet(
name="pct_uppercase_chars_all_search_terms",
full_lookback_window=90,
range_test_window=4,
range_lower_bound=0.075,
range_upper_bound=0.925,
mean_test_window=8,
mean_lower_bound=0.01,
mean_upper_bound=0.99,
moving_average_window=7,
),
InputSet(
name="avg_words_all_search_terms",
full_lookback_window=90,
range_test_window=4,
range_lower_bound=0.125,
range_upper_bound=0.875,
mean_test_window=8,
mean_lower_bound=0.025,
mean_upper_bound=0.975,
moving_average_window=7,
),
InputSet(
name="pct_terms_non_english",
full_lookback_window=90,
range_test_window=4,
range_lower_bound=0.125,
range_upper_bound=0.875,
mean_test_window=8,
mean_lower_bound=0.01,
mean_upper_bound=0.99,
moving_average_window=5,
),
]:
(
finished_at,
num_ranges_compared,
range_alarm,
range_low,
range_high,
range_test_vals,
) = range_check(
val_df,
metric.name,
metric.full_lookback_window,
metric.range_test_window,
metric.range_lower_bound,
metric.range_upper_bound,
)
(
finished_at,
num_moving_averages_compared,
mean_alarm,
mean_low,
mean_high,
mean_window,
mean_test_vals,
) = mean_check(
val_df,
metric.name,
metric.full_lookback_window,
metric.mean_test_window,
metric.moving_average_window,
metric.mean_lower_bound,
metric.mean_upper_bound,
)
rows_to_insert = [
{
"from_sanitization_job_finished_at": finished_at.strftime(
"%Y-%m-%d %H:%M:%S"
),
"started_at": started_at.strftime("%Y-%m-%d %H:%M:%S"),
"range_alarm": range_alarm,
"range_low": range_low,
"range_high": range_high,
"num_ranges_compared": num_ranges_compared,
"range_test_vals": str(range_test_vals),
"mean_alarm": mean_alarm,
"mean_low": mean_low,
"mean_high": mean_high,
"num_moving_averages_compared": num_moving_averages_compared,
"mean_test_vals": str(mean_test_vals),
"metric": metric.name,
"full_lookback_window_num_days": metric.full_lookback_window,
"range_test_window_num_days": metric.range_test_window,
"mean_test_window_num_days": metric.mean_test_window,
"moving_average_window_num_days": metric.moving_average_window,
"range_percentile_lower_bound": metric.range_lower_bound,
"range_percentile_upper_bound": metric.range_upper_bound,
"mean_percentile_lower_bound": metric.range_lower_bound,
"mean_percentile_upper_bound": metric.range_upper_bound,
},
]
errors = client.insert_rows_json(destination_table, rows_to_insert)
if errors:
print(f"Problem recording data validation results: {errors}")
else:
print("Data validation results recorded successfully!")