def main()

in jobs/search-alert/search_alert/main.py [0:0]


def main(project_id, submission_date, dry_run):

    query_statement = """
        WITH
        new_data AS (
        SELECT
            submission_date,
            country,
	        normalized_engine as engine, 
            SUM(sap) AS sap,
            SUM(tagged_sap) AS tagged_sap,
            SUM(tagged_follow_on) AS tagged_follow_on,
            SUM(search_with_ads) AS search_with_ads,
            SUM(ad_click) AS ad_click
        FROM
            `mozdata.search.search_aggregates`
        WHERE
            submission_date = @submission_date
        GROUP BY
            1,
            2,
	        3 ),
        long_data AS (
        SELECT
            submission_date,
            country,
	        engine,
            metric,
            value
        FROM (
            SELECT
            submission_date,
            country,
	        engine,
            'sap' AS metric,
            sap AS value
            FROM
            new_data)
        UNION ALL (
            SELECT
            submission_date,
            country,
	        engine,
            'tagged_sap' AS metric,
            tagged_sap AS value
            FROM
            new_data)
        UNION ALL (
            SELECT
            submission_date,
            country,
	    engine,
            'tagged_follow_on' AS metric,
            tagged_follow_on AS value
            FROM
            new_data)
        UNION ALL (
            SELECT
            submission_date,
            country,
	        engine,
            'search_with_ads' AS metric,
            search_with_ads AS value
            FROM
            new_data)
        UNION ALL (
            SELECT
            submission_date,
            country,
	        engine,
            'ad_click' AS metric,
            ad_click AS value
            FROM
            new_data) ),
	full_data AS (
	SELECT
	    submission_date, 
	    country, 
	    engine, 
	    metric, 
	    value
	FROM (
	    SELECT
	    submission_date, 
	    country, 
	    "ALL" as engine, 
	    metric, 
	    sum(value) as value
	    FROM long_data
	    GROUP BY 1,2,3,4)
	UNION ALL (
	    SELECT
	    submission_date, 
	    country, 
	    engine, 
	    metric, 
	    value
	    FROM long_data
	    WHERE engine in ("Google", "Bing", "DuckDuckGo", "DDG", "Yandex")
	    )),	
        extended_new_data AS (
        SELECT
            submission_date,
            country,
            new_data_index,
            metric,
	        engine,
            value
        FROM (
            SELECT
            DATE(submission_date) AS submission_date,
            country,
            FALSE AS new_data_index,
            metric,
	        engine,
            value
            FROM
            `mozdata.analysis.desktop_search_alert_historical_data`
            WHERE
            DATE(submission_date) >= DATE_SUB(@submission_date, INTERVAL 30 DAY)
            AND DATE(submission_date) < @submission_date )
        UNION ALL (
            SELECT
            DATE(submission_date) AS submission_date,
            country,
            TRUE AS new_data_index,
            metric,
            engine, 
	        value
            FROM
            full_data)
        ORDER BY
            submission_date,
            country )
        SELECT
        *
        FROM
        extended_new_data
    """

    client = bigquery.Client(project = project_id)

    query_job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter("submission_date", "DATE", submission_date),
        ]
    )

    query_job = client.query(query_statement, job_config=query_job_config)
    print("Running query:", query_job.job_id)
    search_data = (query_job.result().to_dataframe())

    search_data = search_data.sort_values(by=['submission_date', 'country', 'metric'])
    search_data['submission_date'] = pd.to_datetime(search_data['submission_date'])

    # today as day 0, what's the value in day -1, -2, -7, -14, -21, -28
    search_data['value_prev1d'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(1)
    search_data['value_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(2)
    search_data['value_prev1w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(7)
    search_data['value_prev2w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(14)
    search_data['value_prev3w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(21)
    search_data['value_prev4w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(28)

    # today as day 0, what's today's value over day -1, -2, -7, -14, -21, -28
    search_data['dod'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(1)
    search_data['do2d'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(2)
    search_data['wow'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(7)
    search_data['wo2w'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(14)
    search_data['wo3w'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(21)
    search_data['wo4w'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(28)

    # today as day 0, what's today's value contribution over global; and how did it look like day -1, -2, -7, -8 
    search_data['pcnt_value'] = search_data['value']/search_data.groupby(['submission_date', 'metric', 'engine']).value.transform(np.sum)
    search_data['pcnt_value_prevd'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(1)
    search_data['pcnt_value_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(2)
    search_data['pcnt_value_prev1w'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(7)
    search_data['pcnt_value_prevd_prev1w'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(8)

    # in terms of dod, how did it look like for day -1, -2
    search_data['dod_prevd'] = search_data.groupby(['country', 'metric', 'engine']).dod.shift(1)
    search_data['dod_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).dod.shift(2)

    # in terms of wow, how did it look like for day -1, -2
    search_data['wow_prevd'] = search_data.groupby(['country', 'metric', 'engine']).wow.shift(1)
    search_data['wow_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).wow.shift(2)

    # how did it look like for dod today, and dod same day last week? 
    search_data['wow_in_dod'] = search_data['dod']/search_data.groupby(['country', 'metric', 'engine']).dod.shift(7)

    # how did it look like for wow today, and wow yesterday, and the day before yesterday?
    search_data['dod_in_wow'] = search_data['wow']/search_data.groupby(['country', 'metric', 'engine']).wow.shift(1)
    search_data['do2d_in_wow'] = search_data['wow']/search_data.groupby(['country', 'metric', 'engine']).wow.shift(2)

    # Only grab the data after the latest_date and add to the table
    search_data = search_data.loc[search_data.new_data_index == True]
    search_data = search_data.drop(columns= ['new_data_index'], axis=1)

    if dry_run:
        print("Dry-run mode, will not write to 'mozdata.analysis.desktop_search_alert_historical_data'")
    else:
        print("Updating 'mozdata.analysis.desktop_search_alert_historical_data'")
        job_config = bigquery.LoadJobConfig(write_disposition = 'WRITE_APPEND')
        job = client.load_table_from_dataframe(search_data, 'mozdata.analysis.desktop_search_alert_historical_data', job_config = job_config)
        job.result()

    """### General Rule without estimate to identify outlier"""

    # can we order the conditions in descending severity?
    # dow: Sat - 5, Sun - 6, Mon - 0, Tue - 1, Wed - 2, Thu - 3, Fri - 4

    search_data['dayofweek'] = search_data['submission_date'].dt.dayofweek

    # Note, the checking quit after the first satisfying condition is met, so should aim to add criterion only if they won't meet by with previous conditions
    conditions = [    
    ### Overall engines
    # Rule1: drop we want to capture on day1 
    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.001) & (search_data['dod'] < 0.1), # decrease (-2)
    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.005) & (search_data['dod'] < 0.3), # decrease (-2)
    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.015) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-2)  

    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.002) & (search_data['dod'] < 0.25), # decrease (-1)
    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.005) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-1)
    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.025) & (search_data['wow'] < 0.8) & (search_data['dod_in_wow'] < 0.8), # decrease (-1)

    # increase we want to capture on day1 -- less sensitive than deal with drop
    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.01) & (search_data['dod'] > 3) & (search_data['dayofweek'] != 0), # increase (1)
    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.01) & (search_data['wow'] > 2) & (search_data['dod_in_wow'] > 2), # increase (1)

    # Rule2: We aim to capture the drop on the completion of the 2nd day if we didn't capture it on the 1st day 
    # if (1) do2d dropped to < 40%, and (2) wow < 60% and (3) the contribution > 0.1% 
    (search_data.engine == 'ALL') & (search_data.value_prev2d > 10000) & (search_data['pcnt_value_prev2d'] > 0.002) & (search_data['wow'] < 0.5) \
       & (search_data['dod'] < 0.9) & (search_data['do2d'] < 0.6) & (search_data['dayofweek'] < 5), 
    # if (1) wow dropped to <60% two days in a roll
    (search_data.engine == 'ALL') & (search_data.pcnt_value_prevd_prev1w >= 0.05) & (search_data['wow'] < 0.75) & (search_data['wow_prevd'] < 0.75),
    # increase
    (search_data.engine == 'ALL') & (search_data.value > 10000) & (search_data['pcnt_value'] > 0.003) & (search_data['wow'] > 1.5*1.0/0.6) & (search_data['do2d'] > 1.5*1.0/0.4), 
	    
    ### For individual engine
    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.01) & (search_data['dod'] < 0.1), # decrease (-2)
    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.03) & (search_data['dod'] < 0.4) & (search_data['dayofweek'] < 5), # not on F/S/S decrease (-2)
    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-2)  

    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.03) & (search_data['dod'] < 0.25) & (search_data['dayofweek'] < 5), # decrease (-1)
    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-1)
    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.10) & (search_data['wow'] < 0.8) & (search_data['dod_in_wow'] < 0.8), # decrease (-1)

    # increase we want to capture on day1 -- less sensitive than deal with drop
    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['dod'] > 4) & (search_data['dayofweek'] <= 1), # increase (1)
    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['wow'] > 4) & (search_data['dod_in_wow'] > 2), # increase (1)

    # Rule2: We aim to capture the drop on the completion of the 2nd day if we didn't capture it on the 1st day 
    # if (1) do2d dropped to < 40%, and (2) wow < 60% and (3) the contribution > 0.1% 
    (search_data.engine != 'ALL') & (search_data.value_prev2d > 10000) & (search_data['pcnt_value_prev2d'] > 0.03) & (search_data['wow'] < 0.5) \
       & (search_data['dod'] < 0.9) & (search_data['do2d'] < 0.6) & (search_data['dayofweek'] < 5), 
    # if (1) wow dropped to <60% two days in a roll
    (search_data.engine != 'ALL') & (search_data.pcnt_value_prevd_prev1w >= 0.05) & (search_data['wow'] < 0.6) & (search_data['wow_prevd'] < 0.6),
    # increase
    #(search_data.engine != 'ALL') & (search_data.value> 10000) & (search_data['pcnt_value'] > 0.05) & (search_data['wow'] > 1.5*1.0/0.6) & (search_data['do2d'] > 1.5*1.0/0.4) 

    ]

    choices = [-2, -2, -2, -1,  -1, -1, 1, 1, -1, -1, 1, -2, -2, -2, -1, -1, -1, 1, 1, -1, -1]
    search_data['abnormal'] = np.select(conditions, choices, default=0)
    abnormality_data = search_data.loc[(search_data.abnormal != 0)]
    abnormality_data['is_holiday'] = [is_it_holiday(abnormality_data.iloc[i]['submission_date'], abnormality_data.iloc[i]['country']) for i in range(abnormality_data.shape[0])]
    abnormality_data['latest_abnormality_in_days'] =   (abnormality_data['submission_date']-pd.to_datetime('1970-01-01')).dt.days

    # if there is newly added abnormality data, then add it to the alert records
    if(abnormality_data.shape[0] > 0):
        if dry_run:
            print("Dry-run mode, will not write to 'mozdata.analysis.desktop_search_alert_records'")
        else:
            print("Updating 'mozdata.analysis.desktop_search_alert_records'")
            job_config = bigquery.LoadJobConfig(write_disposition = 'WRITE_APPEND')
            job = client.load_table_from_dataframe(abnormality_data, 'mozdata.analysis.desktop_search_alert_records', job_config = job_config)
            job.result()
		
		
    # Append to 'mozdata.analysis.desktop_search_alert_latest_daily' daily the latest abnormality date so we can properly trigger the Looker alert
    # In Looker, time series alert check the last 2 rows in the data table (so we need to append daily even there is no abnormality o/t the alert won't work
    query_statement = """
        WITH
        no_holiday_update AS (
        SELECT
            @submission_date AS asof,
            "No" AS is_holiday,
            DATE(MAX(submission_date)) AS latest_abnormality_date,
            MAX(latest_abnormality_in_days) AS latest_abnormality_date_int
        FROM
            `mozdata.analysis.desktop_search_alert_records`
        WHERE
            is_holiday IS FALSE
            AND (abnormal = -2 or abnormal = 2)
	    AND DATE(submission_date) <= @submission_date
        GROUP BY
            1,
            2 ),
        all_update AS (
        SELECT
            @submission_date AS asof,
            "All" AS is_holiday,
            DATE(MAX(submission_date)) AS latest_abnormality_date,
            MAX(latest_abnormality_in_days) AS latest_abnormality_date_int
        FROM
            `mozdata.analysis.desktop_search_alert_records`
        WHERE
            (abnormal = -2 or abnormal = 2)
	    AND DATE(submission_date) <= @submission_date
        GROUP BY
            1,
            2 )
        SELECT
        asof,
        is_holiday,
        latest_abnormality_date,
        latest_abnormality_date_int
        FROM
        no_holiday_update
        UNION ALL (
        SELECT
            asof,
            is_holiday,
            latest_abnormality_date,
            latest_abnormality_date_int
        FROM
            all_update)
    """

    query_job = client.query(query_statement, job_config=query_job_config)
    print("Running query:", query_job.job_id)
    latest_alert_data = (query_job.result().to_dataframe())
    
    if dry_run:
        print("Dry-run mode, will not write to 'mozdata.analysis.desktop_search_alert_latest_daily'")
    else:
        print("Updating 'mozdata.analysis.desktop_search_alert_latest_daily'")
        job_config = bigquery.LoadJobConfig(write_disposition = 'WRITE_APPEND')
        job = client.load_table_from_dataframe(latest_alert_data, 'mozdata.analysis.desktop_search_alert_latest_daily', job_config = job_config)
        job.result()