"""Identifies and records abnormalities in daily search metrics at country level"""

import click
import datetime as dt
import holidays
import numpy as np
import pandas as pd
from google.cloud import bigquery


# check whether the day +/- 1 day is holiday
def is_it_holiday(ds, country):
	isHoliday = False
	ds_range = [pd.to_datetime(ds) + dt.timedelta(days = x) for x in range(-2,2,1)] # get clever about the holiday if it's close to weekend
	try:
		isHoliday_range = [pd.to_datetime(ds) in holidays.CountryHoliday(country) for ds in ds_range]
		isHoliday = max(isHoliday_range)
	except:
		pass
	if country == 'CN':
		isHoliday = max([True if (x.month == 10) & (x.day == 1) else False for x in ds_range])
	if isHoliday == False:
		isHoliday = max([True if (x.month == 12) & (x.day >= 25) else False for x in ds_range]) # Christmax
	if isHoliday == False:
		isHoliday = max([True if (x.month == 1) & (x.day == 1) else False for x in ds_range]) # NewYear
	if isHoliday == False:
		isHoliday = max([True if (x.month == 5) & (x.day == 1) else False for x in ds_range]) # National Labor Day
	return isHoliday


def get_days_since_1970(adate):
   return (pd.to_datetime(adate)-pd.to_datetime(1970,1,1)).dt.days


@click.command()
@click.option("--project_id", required=True)
@click.option("--submission_date", required=True)
@click.option('--dry_run', is_flag=True, default=False)
def main(project_id, submission_date, dry_run):

    query_statement = """
        WITH
        new_data AS (
        SELECT
            submission_date,
            country,
	        normalized_engine as engine, 
            SUM(sap) AS sap,
            SUM(tagged_sap) AS tagged_sap,
            SUM(tagged_follow_on) AS tagged_follow_on,
            SUM(search_with_ads) AS search_with_ads,
            SUM(ad_click) AS ad_click
        FROM
            `mozdata.search.search_aggregates`
        WHERE
            submission_date = @submission_date
        GROUP BY
            1,
            2,
	        3 ),
        long_data AS (
        SELECT
            submission_date,
            country,
	        engine,
            metric,
            value
        FROM (
            SELECT
            submission_date,
            country,
	        engine,
            'sap' AS metric,
            sap AS value
            FROM
            new_data)
        UNION ALL (
            SELECT
            submission_date,
            country,
	        engine,
            'tagged_sap' AS metric,
            tagged_sap AS value
            FROM
            new_data)
        UNION ALL (
            SELECT
            submission_date,
            country,
	    engine,
            'tagged_follow_on' AS metric,
            tagged_follow_on AS value
            FROM
            new_data)
        UNION ALL (
            SELECT
            submission_date,
            country,
	        engine,
            'search_with_ads' AS metric,
            search_with_ads AS value
            FROM
            new_data)
        UNION ALL (
            SELECT
            submission_date,
            country,
	        engine,
            'ad_click' AS metric,
            ad_click AS value
            FROM
            new_data) ),
	full_data AS (
	SELECT
	    submission_date, 
	    country, 
	    engine, 
	    metric, 
	    value
	FROM (
	    SELECT
	    submission_date, 
	    country, 
	    "ALL" as engine, 
	    metric, 
	    sum(value) as value
	    FROM long_data
	    GROUP BY 1,2,3,4)
	UNION ALL (
	    SELECT
	    submission_date, 
	    country, 
	    engine, 
	    metric, 
	    value
	    FROM long_data
	    WHERE engine in ("Google", "Bing", "DuckDuckGo", "DDG", "Yandex")
	    )),	
        extended_new_data AS (
        SELECT
            submission_date,
            country,
            new_data_index,
            metric,
	        engine,
            value
        FROM (
            SELECT
            DATE(submission_date) AS submission_date,
            country,
            FALSE AS new_data_index,
            metric,
	        engine,
            value
            FROM
            `mozdata.analysis.desktop_search_alert_historical_data`
            WHERE
            DATE(submission_date) >= DATE_SUB(@submission_date, INTERVAL 30 DAY)
            AND DATE(submission_date) < @submission_date )
        UNION ALL (
            SELECT
            DATE(submission_date) AS submission_date,
            country,
            TRUE AS new_data_index,
            metric,
            engine, 
	        value
            FROM
            full_data)
        ORDER BY
            submission_date,
            country )
        SELECT
        *
        FROM
        extended_new_data
    """

    client = bigquery.Client(project = project_id)

    query_job_config = bigquery.QueryJobConfig(
        query_parameters=[
            bigquery.ScalarQueryParameter("submission_date", "DATE", submission_date),
        ]
    )

    query_job = client.query(query_statement, job_config=query_job_config)
    print("Running query:", query_job.job_id)
    search_data = (query_job.result().to_dataframe())

    search_data = search_data.sort_values(by=['submission_date', 'country', 'metric'])
    search_data['submission_date'] = pd.to_datetime(search_data['submission_date'])

    # today as day 0, what's the value in day -1, -2, -7, -14, -21, -28
    search_data['value_prev1d'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(1)
    search_data['value_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(2)
    search_data['value_prev1w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(7)
    search_data['value_prev2w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(14)
    search_data['value_prev3w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(21)
    search_data['value_prev4w'] = search_data.groupby(['country', 'metric', 'engine']).value.shift(28)

    # today as day 0, what's today's value over day -1, -2, -7, -14, -21, -28
    search_data['dod'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(1)
    search_data['do2d'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(2)
    search_data['wow'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(7)
    search_data['wo2w'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(14)
    search_data['wo3w'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(21)
    search_data['wo4w'] = search_data['value']/search_data.groupby(['country', 'metric', 'engine']).value.shift(28)

    # today as day 0, what's today's value contribution over global; and how did it look like day -1, -2, -7, -8 
    search_data['pcnt_value'] = search_data['value']/search_data.groupby(['submission_date', 'metric', 'engine']).value.transform(np.sum)
    search_data['pcnt_value_prevd'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(1)
    search_data['pcnt_value_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(2)
    search_data['pcnt_value_prev1w'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(7)
    search_data['pcnt_value_prevd_prev1w'] = search_data.groupby(['country', 'metric', 'engine']).pcnt_value.shift(8)

    # in terms of dod, how did it look like for day -1, -2
    search_data['dod_prevd'] = search_data.groupby(['country', 'metric', 'engine']).dod.shift(1)
    search_data['dod_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).dod.shift(2)

    # in terms of wow, how did it look like for day -1, -2
    search_data['wow_prevd'] = search_data.groupby(['country', 'metric', 'engine']).wow.shift(1)
    search_data['wow_prev2d'] = search_data.groupby(['country', 'metric', 'engine']).wow.shift(2)

    # how did it look like for dod today, and dod same day last week? 
    search_data['wow_in_dod'] = search_data['dod']/search_data.groupby(['country', 'metric', 'engine']).dod.shift(7)

    # how did it look like for wow today, and wow yesterday, and the day before yesterday?
    search_data['dod_in_wow'] = search_data['wow']/search_data.groupby(['country', 'metric', 'engine']).wow.shift(1)
    search_data['do2d_in_wow'] = search_data['wow']/search_data.groupby(['country', 'metric', 'engine']).wow.shift(2)

    # Only grab the data after the latest_date and add to the table
    search_data = search_data.loc[search_data.new_data_index == True]
    search_data = search_data.drop(columns= ['new_data_index'], axis=1)

    if dry_run:
        print("Dry-run mode, will not write to 'mozdata.analysis.desktop_search_alert_historical_data'")
    else:
        print("Updating 'mozdata.analysis.desktop_search_alert_historical_data'")
        job_config = bigquery.LoadJobConfig(write_disposition = 'WRITE_APPEND')
        job = client.load_table_from_dataframe(search_data, 'mozdata.analysis.desktop_search_alert_historical_data', job_config = job_config)
        job.result()

    """### General Rule without estimate to identify outlier"""

    # can we order the conditions in descending severity?
    # dow: Sat - 5, Sun - 6, Mon - 0, Tue - 1, Wed - 2, Thu - 3, Fri - 4

    search_data['dayofweek'] = search_data['submission_date'].dt.dayofweek

    # Note, the checking quit after the first satisfying condition is met, so should aim to add criterion only if they won't meet by with previous conditions
    conditions = [    
    ### Overall engines
    # Rule1: drop we want to capture on day1 
    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.001) & (search_data['dod'] < 0.1), # decrease (-2)
    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.005) & (search_data['dod'] < 0.3), # decrease (-2)
    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.015) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-2)  

    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.002) & (search_data['dod'] < 0.25), # decrease (-1)
    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.005) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-1)
    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.025) & (search_data['wow'] < 0.8) & (search_data['dod_in_wow'] < 0.8), # decrease (-1)

    # increase we want to capture on day1 -- less sensitive than deal with drop
    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.01) & (search_data['dod'] > 3) & (search_data['dayofweek'] != 0), # increase (1)
    (search_data.engine == 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.01) & (search_data['wow'] > 2) & (search_data['dod_in_wow'] > 2), # increase (1)

    # Rule2: We aim to capture the drop on the completion of the 2nd day if we didn't capture it on the 1st day 
    # if (1) do2d dropped to < 40%, and (2) wow < 60% and (3) the contribution > 0.1% 
    (search_data.engine == 'ALL') & (search_data.value_prev2d > 10000) & (search_data['pcnt_value_prev2d'] > 0.002) & (search_data['wow'] < 0.5) \
       & (search_data['dod'] < 0.9) & (search_data['do2d'] < 0.6) & (search_data['dayofweek'] < 5), 
    # if (1) wow dropped to <60% two days in a roll
    (search_data.engine == 'ALL') & (search_data.pcnt_value_prevd_prev1w >= 0.05) & (search_data['wow'] < 0.75) & (search_data['wow_prevd'] < 0.75),
    # increase
    (search_data.engine == 'ALL') & (search_data.value > 10000) & (search_data['pcnt_value'] > 0.003) & (search_data['wow'] > 1.5*1.0/0.6) & (search_data['do2d'] > 1.5*1.0/0.4), 
	    
    ### For individual engine
    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.01) & (search_data['dod'] < 0.1), # decrease (-2)
    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.03) & (search_data['dod'] < 0.4) & (search_data['dayofweek'] < 5), # not on F/S/S decrease (-2)
    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-2)  

    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.03) & (search_data['dod'] < 0.25) & (search_data['dayofweek'] < 5), # decrease (-1)
    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['dod'] < 0.6) & (search_data['dayofweek'] < 5), # decrease (-1)
    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.10) & (search_data['wow'] < 0.8) & (search_data['dod_in_wow'] < 0.8), # decrease (-1)

    # increase we want to capture on day1 -- less sensitive than deal with drop
    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['dod'] > 4) & (search_data['dayofweek'] <= 1), # increase (1)
    (search_data.engine != 'ALL') & (search_data.value_prev1d > 10000) & (search_data['pcnt_value_prevd'] > 0.05) & (search_data['wow'] > 4) & (search_data['dod_in_wow'] > 2), # increase (1)

    # Rule2: We aim to capture the drop on the completion of the 2nd day if we didn't capture it on the 1st day 
    # if (1) do2d dropped to < 40%, and (2) wow < 60% and (3) the contribution > 0.1% 
    (search_data.engine != 'ALL') & (search_data.value_prev2d > 10000) & (search_data['pcnt_value_prev2d'] > 0.03) & (search_data['wow'] < 0.5) \
       & (search_data['dod'] < 0.9) & (search_data['do2d'] < 0.6) & (search_data['dayofweek'] < 5), 
    # if (1) wow dropped to <60% two days in a roll
    (search_data.engine != 'ALL') & (search_data.pcnt_value_prevd_prev1w >= 0.05) & (search_data['wow'] < 0.6) & (search_data['wow_prevd'] < 0.6),
    # increase
    #(search_data.engine != 'ALL') & (search_data.value> 10000) & (search_data['pcnt_value'] > 0.05) & (search_data['wow'] > 1.5*1.0/0.6) & (search_data['do2d'] > 1.5*1.0/0.4) 

    ]

    choices = [-2, -2, -2, -1,  -1, -1, 1, 1, -1, -1, 1, -2, -2, -2, -1, -1, -1, 1, 1, -1, -1]
    search_data['abnormal'] = np.select(conditions, choices, default=0)
    abnormality_data = search_data.loc[(search_data.abnormal != 0)]
    abnormality_data['is_holiday'] = [is_it_holiday(abnormality_data.iloc[i]['submission_date'], abnormality_data.iloc[i]['country']) for i in range(abnormality_data.shape[0])]
    abnormality_data['latest_abnormality_in_days'] =   (abnormality_data['submission_date']-pd.to_datetime('1970-01-01')).dt.days

    # if there is newly added abnormality data, then add it to the alert records
    if(abnormality_data.shape[0] > 0):
        if dry_run:
            print("Dry-run mode, will not write to 'mozdata.analysis.desktop_search_alert_records'")
        else:
            print("Updating 'mozdata.analysis.desktop_search_alert_records'")
            job_config = bigquery.LoadJobConfig(write_disposition = 'WRITE_APPEND')
            job = client.load_table_from_dataframe(abnormality_data, 'mozdata.analysis.desktop_search_alert_records', job_config = job_config)
            job.result()
		
		
    # Append to 'mozdata.analysis.desktop_search_alert_latest_daily' daily the latest abnormality date so we can properly trigger the Looker alert
    # In Looker, time series alert check the last 2 rows in the data table (so we need to append daily even there is no abnormality o/t the alert won't work
    query_statement = """
        WITH
        no_holiday_update AS (
        SELECT
            @submission_date AS asof,
            "No" AS is_holiday,
            DATE(MAX(submission_date)) AS latest_abnormality_date,
            MAX(latest_abnormality_in_days) AS latest_abnormality_date_int
        FROM
            `mozdata.analysis.desktop_search_alert_records`
        WHERE
            is_holiday IS FALSE
            AND (abnormal = -2 or abnormal = 2)
	    AND DATE(submission_date) <= @submission_date
        GROUP BY
            1,
            2 ),
        all_update AS (
        SELECT
            @submission_date AS asof,
            "All" AS is_holiday,
            DATE(MAX(submission_date)) AS latest_abnormality_date,
            MAX(latest_abnormality_in_days) AS latest_abnormality_date_int
        FROM
            `mozdata.analysis.desktop_search_alert_records`
        WHERE
            (abnormal = -2 or abnormal = 2)
	    AND DATE(submission_date) <= @submission_date
        GROUP BY
            1,
            2 )
        SELECT
        asof,
        is_holiday,
        latest_abnormality_date,
        latest_abnormality_date_int
        FROM
        no_holiday_update
        UNION ALL (
        SELECT
            asof,
            is_holiday,
            latest_abnormality_date,
            latest_abnormality_date_int
        FROM
            all_update)
    """

    query_job = client.query(query_statement, job_config=query_job_config)
    print("Running query:", query_job.job_id)
    latest_alert_data = (query_job.result().to_dataframe())
    
    if dry_run:
        print("Dry-run mode, will not write to 'mozdata.analysis.desktop_search_alert_latest_daily'")
    else:
        print("Updating 'mozdata.analysis.desktop_search_alert_latest_daily'")
        job_config = bigquery.LoadJobConfig(write_disposition = 'WRITE_APPEND')
        job = client.load_table_from_dataframe(latest_alert_data, 'mozdata.analysis.desktop_search_alert_latest_daily', job_config = job_config)
        job.result()


if __name__ == "__main__":
    main()
