in jobs/search-alert/search_alert/main.py [0:0]
def main(project_id, submission_date, dry_run):
query_statement = """
WITH
new_data AS (
SELECT
submission_date,
country,
normalized_engine as engine,
SUM(sap) AS sap,
SUM(tagged_sap) AS tagged_sap,
SUM(tagged_follow_on) AS tagged_follow_on,
SUM(search_with_ads) AS search_with_ads,
SUM(ad_click) AS ad_click
FROM
`mozdata.search.search_aggregates`
WHERE
submission_date = @submission_date
GROUP BY
1,
2,
3 ),
long_data AS (
SELECT
submission_date,
country,
engine,
metric,
value
FROM (
SELECT
submission_date,
country,
engine,
'sap' AS metric,
sap AS value
FROM
new_data)
UNION ALL (
SELECT
submission_date,
country,
engine,
'tagged_sap' AS metric,
tagged_sap AS value
FROM
new_data)
UNION ALL (
SELECT
submission_date,
country,
engine,
'tagged_follow_on' AS metric,
tagged_follow_on AS value
FROM
new_data)
UNION ALL (
SELECT
submission_date,
country,
engine,
'search_with_ads' AS metric,
search_with_ads AS value
FROM
new_data)
UNION ALL (
SELECT
submission_date,
country,
engine,
'ad_click' AS metric,
ad_click AS value
FROM
new_data) ),
full_data AS (
SELECT
submission_date,
country,
engine,
metric,
value
FROM (
SELECT
submission_date,
country,
"ALL" as engine,
metric,
sum(value) as value
FROM long_data
GROUP BY 1,2,3,4)
UNION ALL (
SELECT
submission_date,
country,
engine,
metric,
value
FROM long_data
WHERE engine in ("Google", "Bing", "DuckDuckGo", "DDG", "Yandex")
)),
extended_new_data AS (
SELECT
submission_date,
country,
new_data_index,
metric,
engine,
value
FROM (
SELECT
DATE(submission_date) AS submission_date,
country,
FALSE AS new_data_index,
metric,
engine,
value
FROM
`mozdata.analysis.desktop_search_alert_historical_data`
WHERE
DATE(submission_date) >= DATE_SUB(@submission_date, INTERVAL 30 DAY)
AND DATE(submission_date) < @submission_date )
UNION ALL (
SELECT
DATE(submission_date) AS submission_date,
country,
TRUE AS new_data_index,
metric,
engine,
value
FROM
full_data)
ORDER BY
submission_date,
country )
SELECT
*
FROM
extended_new_data
"""
client = bigquery.Client(project = project_id)
query_job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("submission_date", "DATE", submission_date),
]
)
query_job = client.query(query_statement, job_config=query_job_config)
print("Running query:", query_job.job_id)
search_data = (query_job.result().to_dataframe())
search_data = search_data.sort_values(by=['submission_date', 'country', 'metric'])
search_data['submission_date'] = pd.to_datetime(search_data['submission_date'])
# today as day 0, what's the value in day -1, -2, -7, -14, -21, -28
search_data['value_prev1d'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(1)
search_data['value_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(2)
search_data['value_prev1w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(7)
search_data['value_prev2w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(14)
search_data['value_prev3w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(21)
search_data['value_prev4w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(28)
# today as day 0, what's today's value over day -1, -2, -7, -14, -21, -28
search_data['dod'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(1)
search_data['do2d'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(2)
search_data['wow'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(7)
search_data['wo2w'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(14)
search_data['wo3w'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(21)
search_data['wo4w'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(28)
# today as day 0, what's today's value contribution over global; and how did it look like day -1, -2, -7, -8
search_data['pcnt_value'] = search_data['value']/search_data.groupby(['submission_date', 'metric', 'engine']).value.transform(np.sum)
search_data['pcnt_value_prevd'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(1)
search_data['pcnt_value_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(2)
search_data['pcnt_value_prev1w'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(7)
search_data['pcnt_value_prevd_prev1w'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(8)
# in terms of dod, how did it look like for day -1, -2
search_data['dod_prevd'] = search_data.groupby(['country', 'metric', 'engine']).dod.shift(1)
search_data['dod_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).dod.shift(2)
# in terms of wow, how did it look like for day -1, -2
search_data['wow_prevd'] = search_data.groupby(['country', 'metric', 'engine']).wow.shift(1)
search_data['wow_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).wow.shift(2)
# how did it look like for dod today, and dod same day last week?
search_data['wow_in_dod'] = search_data['dod']/search_data.groupby(['country', 'metric', 'engine']).dod.shift(7)
# how did it look like for wow today, and wow yesterday, and the day before yesterday?
search_data['dod_in_wow'] = search_data['wow']/search_data.groupby(['country', 'metric', 'engine']).wow.shift(1)
search_data['do2d_in_wow'] = search_data['wow']/search_data.groupby(['country', 'metric', 'engine']).wow.shift(2)
# Only grab the data after the latest_date and add to the table
search_data = search_data.loc[search_data.new_data_index == True]
search_data = search_data.drop(columns= ['new_data_index'], axis=1)
if dry_run:
print("Dry-run mode, will not write to 'mozdata.analysis.desktop_search_alert_historical_data'")
else:
print("Updating 'mozdata.analysis.desktop_search_alert_historical_data'")
job_config = bigquery.LoadJobConfig(write_disposition = 'WRITE_APPEND')
job = client.load_table_from_dataframe(search_data, 'mozdata.analysis.desktop_search_alert_historical_data', job_config = job_config)
job.result()
"""### General Rule without estimate to identify outlier"""
# can we order the conditions in descending severity?
# dow: Sat - 5, Sun - 6, Mon - 0, Tue - 1, Wed - 2, Thu - 3, Fri - 4
search_data['dayofweek'] = search_data['submission_date'].dt.dayofweek
# Note, the checking quit after the first satisfying condition is met, so should aim to add criterion only if they won't meet by with previous conditions
conditions = [
### Overall engines
# Rule1: drop we want to capture on day1
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.001) & (search_data['dod'] < 0.1), # decrease (-2)
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.005) & (search_data['dod'] < 0.3), # decrease (-2)
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.015) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-2)
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.002) & (search_data['dod'] < 0.25), # decrease (-1)
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.005) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-1)
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.025) & (search_data['wow'] < 0.8) & (search_data['dod_in_wow'] < 0.8), # decrease (-1)
# increase we want to capture on day1 -- less sensitive than deal with drop
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.01) & (search_data['dod'] > 3) & (search_data['dayofweek'] != 0), # increase (1)
(search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.01) & (search_data['wow'] > 2) & (search_data['dod_in_wow'] > 2), # increase (1)
# Rule2: We aim to capture the drop on the completion of the 2nd day if we didn't capture it on the 1st day
# if (1) do2d dropped to < 40%, and (2) wow < 60% and (3) the contribution > 0.1%
(search_data.engine == 'ALL') & (search_data.value_prev2d > 10000) & (search_data['pcnt_value_prev2d'] > 0.002) & (search_data['wow'] < 0.5) \
& (search_data['dod'] < 0.9) & (search_data['do2d'] < 0.6) & (search_data['dayofweek'] < 5),
# if (1) wow dropped to <60% two days in a roll
(search_data.engine == 'ALL') & (search_data.pcnt_value_prevd_prev1w >= 0.05) & (search_data['wow'] < 0.75) & (search_data['wow_prevd'] < 0.75),
# increase
(search_data.engine == 'ALL') & (search_data.value > 10000) & (search_data['pcnt_value'] > 0.003) & (search_data['wow'] > 1.5*1.0/0.6) & (search_data['do2d'] > 1.5*1.0/0.4),
### For individual engine
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.01) & (search_data['dod'] < 0.1), # decrease (-2)
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.03) & (search_data['dod'] < 0.4) & (search_data['dayofweek'] < 5), # not on F/S/S decrease (-2)
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-2)
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.03) & (search_data['dod'] < 0.25) & (search_data['dayofweek'] < 5), # decrease (-1)
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-1)
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.10) & (search_data['wow'] < 0.8) & (search_data['dod_in_wow'] < 0.8), # decrease (-1)
# increase we want to capture on day1 -- less sensitive than deal with drop
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['dod'] > 4) & (search_data['dayofweek'] <= 1), # increase (1)
(search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['wow'] > 4) & (search_data['dod_in_wow'] > 2), # increase (1)
# Rule2: We aim to capture the drop on the completion of the 2nd day if we didn't capture it on the 1st day
# if (1) do2d dropped to < 40%, and (2) wow < 60% and (3) the contribution > 0.1%
(search_data.engine != 'ALL') & (search_data.value_prev2d > 10000) & (search_data['pcnt_value_prev2d'] > 0.03) & (search_data['wow'] < 0.5) \
& (search_data['dod'] < 0.9) & (search_data['do2d'] < 0.6) & (search_data['dayofweek'] < 5),
# if (1) wow dropped to <60% two days in a roll
(search_data.engine != 'ALL') & (search_data.pcnt_value_prevd_prev1w >= 0.05) & (search_data['wow'] < 0.6) & (search_data['wow_prevd'] < 0.6),
# increase
#(search_data.engine != 'ALL') & (search_data.value> 10000) & (search_data['pcnt_value'] > 0.05) & (search_data['wow'] > 1.5*1.0/0.6) & (search_data['do2d'] > 1.5*1.0/0.4)
]
choices = [-2, -2, -2, -1, -1, -1, 1, 1, -1, -1, 1, -2, -2, -2, -1, -1, -1, 1, 1, -1, -1]
search_data['abnormal'] = np.select(conditions, choices, default=0)
abnormality_data = search_data.loc[(search_data.abnormal != 0)]
abnormality_data['is_holiday'] = [is_it_holiday(abnormality_data.iloc[i]['submission_date'], abnormality_data.iloc[i]['country']) for i in range(abnormality_data.shape[0])]
abnormality_data['latest_abnormality_in_days'] = (abnormality_data['submission_date']-pd.to_datetime('1970-01-01')).dt.days
# if there is newly added abnormality data, then add it to the alert records
if(abnormality_data.shape[0] > 0):
if dry_run:
print("Dry-run mode, will not write to 'mozdata.analysis.desktop_search_alert_records'")
else:
print("Updating 'mozdata.analysis.desktop_search_alert_records'")
job_config = bigquery.LoadJobConfig(write_disposition = 'WRITE_APPEND')
job = client.load_table_from_dataframe(abnormality_data, 'mozdata.analysis.desktop_search_alert_records', job_config = job_config)
job.result()
# Append to 'mozdata.analysis.desktop_search_alert_latest_daily' daily the latest abnormality date so we can properly trigger the Looker alert
# In Looker, time series alert check the last 2 rows in the data table (so we need to append daily even there is no abnormality o/t the alert won't work
query_statement = """
WITH
no_holiday_update AS (
SELECT
@submission_date AS asof,
"No" AS is_holiday,
DATE(MAX(submission_date)) AS latest_abnormality_date,
MAX(latest_abnormality_in_days) AS latest_abnormality_date_int
FROM
`mozdata.analysis.desktop_search_alert_records`
WHERE
is_holiday IS FALSE
AND (abnormal = -2 or abnormal = 2)
AND DATE(submission_date) <= @submission_date
GROUP BY
1,
2 ),
all_update AS (
SELECT
@submission_date AS asof,
"All" AS is_holiday,
DATE(MAX(submission_date)) AS latest_abnormality_date,
MAX(latest_abnormality_in_days) AS latest_abnormality_date_int
FROM
`mozdata.analysis.desktop_search_alert_records`
WHERE
(abnormal = -2 or abnormal = 2)
AND DATE(submission_date) <= @submission_date
GROUP BY
1,
2 )
SELECT
asof,
is_holiday,
latest_abnormality_date,
latest_abnormality_date_int
FROM
no_holiday_update
UNION ALL (
SELECT
asof,
is_holiday,
latest_abnormality_date,
latest_abnormality_date_int
FROM
all_update)
"""
query_job = client.query(query_statement, job_config=query_job_config)
print("Running query:", query_job.job_id)
latest_alert_data = (query_job.result().to_dataframe())
if dry_run:
print("Dry-run mode, will not write to 'mozdata.analysis.desktop_search_alert_latest_daily'")
else:
print("Updating 'mozdata.analysis.desktop_search_alert_latest_daily'")
job_config = bigquery.LoadJobConfig(write_disposition = 'WRITE_APPEND')
job = client.load_table_from_dataframe(latest_alert_data, 'mozdata.analysis.desktop_search_alert_latest_daily', job_config = job_config)
job.result()