jobs/search-alert/search_alert/main.py (127 lines of code) (raw):
"""Identifies and records abnormalities in daily search metrics at country level"""
import click
import datetime as dt
import holidays
import numpy as np
import pandas as pd
from google.cloud import bigquery
# check whether the day +/- 1 day is holiday
def is_it_holiday(ds, country):
isHoliday = False
ds_range = [pd.to_datetime(ds) + dt.timedelta(days = x) for x in range(-2,2,1)] # get clever about the holiday if it's close to weekend
try:
isHoliday_range = [pd.to_datetime(ds) in holidays.CountryHoliday(country) for ds in ds_range]
isHoliday = max(isHoliday_range)
except:
pass
if country == 'CN':
isHoliday = max([True if (x.month == 10) & (x.day == 1) else False for x in ds_range])
if isHoliday == False:
isHoliday = max([True if (x.month == 12) & (x.day >= 25) else False for x in ds_range]) # Christmax
if isHoliday == False:
isHoliday = max([True if (x.month == 1) & (x.day == 1) else False for x in ds_range]) # NewYear
if isHoliday == False:
isHoliday = max([True if (x.month == 5) & (x.day == 1) else False for x in ds_range]) # National Labor Day
return isHoliday
def get_days_since_1970(adate):
return (pd.to_datetime(adate)-pd.to_datetime(1970,1,1)).dt.days
@click.command()
@click.option("--project_id", required=True)
@click.option("--submission_date", required=True)
@click.option('--dry_run', is_flag=True, default=False)
def main(project_id, submission_date, dry_run):
query_statement = """
WITH
new_data AS (
SELECT
submission_date,
country,
normalized_engine as engine,
SUM(sap) AS sap,
SUM(tagged_sap) AS tagged_sap,
SUM(tagged_follow_on) AS tagged_follow_on,
SUM(search_with_ads) AS search_with_ads,
SUM(ad_click) AS ad_click
FROM
`mozdata.search.search_aggregates`
WHERE
submission_date = @submission_date
GROUP BY
1,
2,
3 ),
long_data AS (
SELECT
submission_date,
country,
engine,
metric,
value
FROM (
SELECT
submission_date,
country,
engine,
'sap' AS metric,
sap AS value
FROM
new_data)
UNION ALL (
SELECT
submission_date,
country,
engine,
'tagged_sap' AS metric,
tagged_sap AS value
FROM
new_data)
UNION ALL (
SELECT
submission_date,
country,
engine,
'tagged_follow_on' AS metric,
tagged_follow_on AS value
FROM
new_data)
UNION ALL (
SELECT
submission_date,
country,
engine,
'search_with_ads' AS metric,
search_with_ads AS value
FROM
new_data)
UNION ALL (
SELECT
submission_date,
country,
engine,
'ad_click' AS metric,
ad_click AS value
FROM
new_data) ),
full_data AS (
SELECT
submission_date,
country,
engine,
metric,
value
FROM (
SELECT
submission_date,
country,
"ALL" as engine,
metric,
sum(value) as value
FROM long_data
GROUP BY 1,2,3,4)
UNION ALL (
SELECT
submission_date,
country,
engine,
metric,
value
FROM long_data
WHERE engine in ("Google", "Bing", "DuckDuckGo", "DDG", "Yandex")
)),
extended_new_data AS (
SELECT
submission_date,
country,
new_data_index,
metric,
engine,
value
FROM (
SELECT
DATE(submission_date) AS submission_date,
country,
FALSE AS new_data_index,
metric,
engine,
value
FROM
`mozdata.analysis.desktop_search_alert_historical_data`
WHERE
DATE(submission_date) >= DATE_SUB(@submission_date, INTERVAL 30 DAY)
AND DATE(submission_date) < @submission_date )
UNION ALL (
SELECT
DATE(submission_date) AS submission_date,
country,
TRUE AS new_data_index,
metric,
engine,
value
FROM
full_data)
ORDER BY
submission_date,
country )
SELECT
*
FROM
extended_new_data
"""
client = bigquery.Client(project = project_id)
query_job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("submission_date", "DATE", submission_date),
]
)
query_job = client.query(query_statement, job_config=query_job_config)
print("Running query:", query_job.job_id)
search_data = (query_job.result().to_dataframe())
search_data = search_data.sort_values(by=['submission_date', 'country', 'metric'])
search_data['submission_date'] = pd.to_datetime(search_data['submission_date'])
# today as day 0, what's the value in day -1, -2, -7, -14, -21, -28
search_data['value_prev1d'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(1)
search_data['value_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(2)
search_data['value_prev1w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(7)
search_data['value_prev2w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(14)
search_data['value_prev3w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(21)
search_data['value_prev4w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(28)
# today as day 0, what's today's value over day -1, -2, -7, -14, -21, -28
search_data['dod'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(1)
search_data['do2d'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(2)
search_data['wow'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(7)
search_data['wo2w'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(14)
search_data['wo3w'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(21)
search_data['wo4w'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(28)
# today as day 0, what's today's value contribution over global; and how did it look like day -1, -2, -7, -8
search_data['pcnt_value'] = search_data['value']/search_data.groupby(['submission_date', 'metric', 'engine']).value.transform(np.sum)
search_data['pcnt_value_prevd'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(1)
search_data['pcnt_value_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(2)
search_data['pcnt_value_prev1w'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(7)
search_data['pcnt_value_prevd_prev1w'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(8)
# in terms of dod, how did it look like for day -1, -2
search_data['dod_prevd'] = search_data.groupby(['country', 'metric', 'engine']).dod.shift(1)
search_data['dod_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).dod.shift(2)
# in terms of wow, how did it look like for day -1, -2
search_data['wow_prevd'] = search_data.groupby(['country', 'metric', 'engine']).wow.shift(1)
search_data['wow_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).wow.shift(2)
# how did it look like for dod today, and dod same day last week?
search_data['wow_in_dod'] = search_data['dod']/search_data.groupby(['country', 'metric', 'engine']).dod.shift(7)
# how did it look like for wow today, and wow yesterday, and the day before yesterday?
search_data['dod_in_wow'] = search_data['wow']/search_data.groupby(['country', 'metric', 'engine']).wow.shift(1)
search_data['do2d_in_wow'] = search_data['wow']/search_data.groupby(['country', 'metric', 'engine']).wow.shift(2)
# Only grab the data after the latest_date and add to the table
search_data = search_data.loc[search_data.new_data_index == True]
search_data = search_data.drop(columns= ['new_data_index'], axis=1)
if dry_run:
print("Dry-run mode, will not write to 'mozdata.analysis.desktop_search_alert_historical_data'")
else:
print("Updating 'mozdata.analysis.desktop_search_alert_historical_data'")
job_config = bigquery.LoadJobConfig(write_disposition = 'WRITE_APPEND')
job = client.load_table_from_dataframe(search_data, 'mozdata.analysis.desktop_search_alert_historical_data', job_config = job_config)
job.result()
"""### General Rule without estimate to identify outlier"""
# can we order the conditions in descending severity?
# dow: Sat - 5, Sun - 6, Mon - 0, Tue - 1, Wed - 2, Thu - 3, Fri - 4
search_data['dayofweek'] = search_data['submission_date'].dt.dayofweek
# Note, the checking quit after the first satisfying condition is met, so should aim to add criterion only if they won't meet by with previous conditions
conditions = [
### Overall engines
# Rule1: drop we want to capture on day1
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.001) & (search_data['dod'] < 0.1), # decrease (-2)
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.005) & (search_data['dod'] < 0.3), # decrease (-2)
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.015) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-2)
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.002) & (search_data['dod'] < 0.25), # decrease (-1)
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.005) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-1)
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.025) & (search_data['wow'] < 0.8) & (search_data['dod_in_wow'] < 0.8), # decrease (-1)
# increase we want to capture on day1 -- less sensitive than deal with drop
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.01) & (search_data['dod'] > 3) & (search_data['dayofweek'] != 0), # increase (1)
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.01) & (search_data['wow'] > 2) & (search_data['dod_in_wow'] > 2), # increase (1)
# Rule2: We aim to capture the drop on the completion of the 2nd day if we didn't capture it on the 1st day
# if (1) do2d dropped to < 40%, and (2) wow < 60% and (3) the contribution > 0.1%
(search_data.engine == 'ALL') & (search_data.value_prev2d > 10000) & (search_data['pcnt_value_prev2d'] > 0.002) & (search_data['wow'] < 0.5) \
& (search_data['dod'] < 0.9) & (search_data['do2d'] < 0.6) & (search_data['dayofweek'] < 5),
# if (1) wow dropped to <60% two days in a roll
(search_data.engine == 'ALL') & (search_data.pcnt_value_prevd_prev1w >= 0.05) & (search_data['wow'] < 0.75) & (search_data['wow_prevd'] < 0.75),
# increase
(search_data.engine == 'ALL') & (search_data.value > 10000) & (search_data['pcnt_value'] > 0.003) & (search_data['wow'] > 1.5*1.0/0.6) & (search_data['do2d'] > 1.5*1.0/0.4),
### For individual engine
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.01) & (search_data['dod'] < 0.1), # decrease (-2)
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.03) & (search_data['dod'] < 0.4) & (search_data['dayofweek'] < 5), # not on F/S/S decrease (-2)
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-2)
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.03) & (search_data['dod'] < 0.25) & (search_data['dayofweek'] < 5), # decrease (-1)
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-1)
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.10) & (search_data['wow'] < 0.8) & (search_data['dod_in_wow'] < 0.8), # decrease (-1)
# increase we want to capture on day1 -- less sensitive than deal with drop
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['dod'] > 4) & (search_data['dayofweek'] <= 1), # increase (1)
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['wow'] > 4) & (search_data['dod_in_wow'] > 2), # increase (1)
# Rule2: We aim to capture the drop on the completion of the 2nd day if we didn't capture it on the 1st day
# if (1) do2d dropped to < 40%, and (2) wow < 60% and (3) the contribution > 0.1%
(search_data.engine != 'ALL') & (search_data.value_prev2d > 10000) & (search_data['pcnt_value_prev2d'] > 0.03) & (search_data['wow'] < 0.5) \
& (search_data['dod'] < 0.9) & (search_data['do2d'] < 0.6) & (search_data['dayofweek'] < 5),
# if (1) wow dropped to <60% two days in a roll
(search_data.engine != 'ALL') & (search_data.pcnt_value_prevd_prev1w >= 0.05) & (search_data['wow'] < 0.6) & (search_data['wow_prevd'] < 0.6),
# increase
#(search_data.engine != 'ALL') & (search_data.value> 10000) & (search_data['pcnt_value'] > 0.05) & (search_data['wow'] > 1.5*1.0/0.6) & (search_data['do2d'] > 1.5*1.0/0.4)
]
choices = [-2, -2, -2, -1, -1, -1, 1, 1, -1, -1, 1, -2, -2, -2, -1, -1, -1, 1, 1, -1, -1]
search_data['abnormal'] = np.select(conditions, choices, default=0)
abnormality_data = search_data.loc[(search_data.abnormal != 0)]
abnormality_data['is_holiday'] = [is_it_holiday(abnormality_data.iloc[i]['submission_date'], abnormality_data.iloc[i]['country']) for i in range(abnormality_data.shape[0])]
abnormality_data['latest_abnormality_in_days'] = (abnormality_data['submission_date']-pd.to_datetime('1970-01-01')).dt.days
# if there is newly added abnormality data, then add it to the alert records
if(abnormality_data.shape[0] > 0):
if dry_run:
print("Dry-run mode, will not write to 'mozdata.analysis.desktop_search_alert_records'")
else:
print("Updating 'mozdata.analysis.desktop_search_alert_records'")
job_config = bigquery.LoadJobConfig(write_disposition = 'WRITE_APPEND')
job = client.load_table_from_dataframe(abnormality_data, 'mozdata.analysis.desktop_search_alert_records', job_config = job_config)
job.result()
# Append to 'mozdata.analysis.desktop_search_alert_latest_daily' daily the latest abnormality date so we can properly trigger the Looker alert
# In Looker, time series alert check the last 2 rows in the data table (so we need to append daily even there is no abnormality o/t the alert won't work
query_statement = """
WITH
no_holiday_update AS (
SELECT
@submission_date AS asof,
"No" AS is_holiday,
DATE(MAX(submission_date)) AS latest_abnormality_date,
MAX(latest_abnormality_in_days) AS latest_abnormality_date_int
FROM
`mozdata.analysis.desktop_search_alert_records`
WHERE
is_holiday IS FALSE
AND (abnormal = -2 or abnormal = 2)
AND DATE(submission_date) <= @submission_date
GROUP BY
1,
2 ),
all_update AS (
SELECT
@submission_date AS asof,
"All" AS is_holiday,
DATE(MAX(submission_date)) AS latest_abnormality_date,
MAX(latest_abnormality_in_days) AS latest_abnormality_date_int
FROM
`mozdata.analysis.desktop_search_alert_records`
WHERE
(abnormal = -2 or abnormal = 2)
AND DATE(submission_date) <= @submission_date
GROUP BY
1,
2 )
SELECT
asof,
is_holiday,
latest_abnormality_date,
latest_abnormality_date_int
FROM
no_holiday_update
UNION ALL (
SELECT
asof,
is_holiday,
latest_abnormality_date,
latest_abnormality_date_int
FROM
all_update)
"""
query_job = client.query(query_statement, job_config=query_job_config)
print("Running query:", query_job.job_id)
latest_alert_data = (query_job.result().to_dataframe())
if dry_run:
print("Dry-run mode, will not write to 'mozdata.analysis.desktop_search_alert_latest_daily'")
else:
print("Updating 'mozdata.analysis.desktop_search_alert_latest_daily'")
job_config = bigquery.LoadJobConfig(write_disposition = 'WRITE_APPEND')
job = client.load_table_from_dataframe(latest_alert_data, 'mozdata.analysis.desktop_search_alert_latest_daily', job_config = job_config)
job.result()
if __name__ == "__main__":
main()