sql/moz-fx-data-shared-prod/cloudflare_derived/device_usage_v1/query.py (346 lines of code) (raw):

# Load libraries import json import os from datetime import datetime, timedelta import pandas as pd import requests from argparse import ArgumentParser from google.cloud import bigquery from google.cloud import storage # Configs device_usg_configs = { "timeout_limit": 500, "locations": [ "ALL", "AU", "BE", "BG", "BR", "CA", "CH", "CZ", "DE", "DK", "EE", "ES", "FI", "FR", "GB", "HR", "IE", "IN", "IT", "JP", "KE", "CY", "LV", "LT", "LU", "HU", "MT", "MX", "NL", "AT", "PL", "PT", "RO", "SI", "SK", "US", "SE", "GR", ], "bucket": "gs://moz-fx-data-prod-external-data/", "bucket_no_gs": "moz-fx-data-prod-external-data", "results_stg_gcs_fpth": "cloudflare/device_usage/RESULTS_STAGING/%s_results_from_%s_run.csv", "results_archive_gcs_fpath": "cloudflare/device_usage/RESULTS_ARCHIVE/%s_results_from_%s_run.csv", "errors_stg_gcs_fpth": "cloudflare/device_usage/ERRORS_STAGING/%s_errors_from_%s_run.csv", "errors_archive_gcs_fpath": "cloudflare/device_usage/ERRORS_ARCHIVE/%s_errors_from_%s_run.csv", "gcp_project_id": "moz-fx-data-shared-prod", "gcp_conn_id": "google_cloud_shared_prod", "results_bq_stg_table": "moz-fx-data-shared-prod.cloudflare_derived.device_results_stg", "errors_bq_stg_table": "moz-fx-data-shared-prod.cloudflare_derived.device_errors_stg", } # Load the Cloudflare API Token cloudflare_api_token = os.getenv("CLOUDFLARE_AUTH_TOKEN") # Define a function to move a GCS object then delete the original def move_blob(bucket_name, blob_name, destination_bucket_name, destination_blob_name): """Moves a blob from one bucket to another with a new name.""" storage_client = storage.Client() source_bucket = storage_client.bucket(bucket_name) source_blob = source_bucket.blob(blob_name) destination_bucket = storage_client.bucket(destination_bucket_name) destination_generation_match_precondition = None blob_copy = source_bucket.copy_blob( source_blob, destination_bucket, destination_blob_name, if_generation_match=destination_generation_match_precondition, ) source_bucket.delete_blob(blob_name) print( "Blob {} in bucket {} moved to blob {} in bucket {}.".format( source_blob.name, source_bucket.name, blob_copy.name, destination_bucket.name, ) ) def generate_device_type_timeseries_api_call(strt_dt, end_dt, agg_int, location): """Calculate API to call based on given parameters.""" if location == "ALL": device_usage_api_url = f"https://api.cloudflare.com/client/v4/radar/http/timeseries_groups/device_type?name=human&botClass=LIKELY_HUMAN&dateStart={strt_dt}T00:00:00.000Z&dateEnd={end_dt}T00:00:00.000Z&name=bot&botClass=LIKELY_AUTOMATED&dateStart={strt_dt}T00:00:00.000Z&dateEnd={end_dt}T00:00:00.000Z&format=json&aggInterval={agg_int}" else: device_usage_api_url = f"https://api.cloudflare.com/client/v4/radar/http/timeseries_groups/device_type?name=human&botClass=LIKELY_HUMAN&dateStart={strt_dt}T00:00:00.000Z&dateEnd={end_dt}T00:00:00.000Z&location={location}&name=bot&botClass=LIKELY_AUTOMATED&dateStart={strt_dt}T00:00:00.000Z&dateEnd={end_dt}T00:00:00.000Z&location={location}&format=json&aggInterval={agg_int}" return device_usage_api_url def parse_device_type_timeseries_response_human(result): """Take the response JSON and returns parsed human traffic information.""" human_timestamps = result["human"]["timestamps"][0] human_desktop = result["human"]["desktop"][0] human_mobile = result["human"]["mobile"][0] human_other = result["human"]["other"][0] return human_timestamps, human_desktop, human_mobile, human_other def parse_device_type_timeseries_response_bot(result): """Take the response JSON and returns parsed bot traffic information.""" bot_timestamps = result["bot"]["timestamps"][0] bot_desktop = result["bot"]["desktop"][0] bot_mobile = result["bot"]["mobile"][0] bot_other = result["bot"]["other"][0] return bot_timestamps, bot_desktop, bot_mobile, bot_other # Generate the result dataframe def make_device_usage_result_df( user_type, desktop, mobile, other, timestamps, last_upd, norm, conf, agg_interval, location, ): """Initialize a result dataframe for device usage data.""" return pd.DataFrame( { "Timestamp": timestamps, "UserType": [user_type], "Location": [location], "DesktopUsagePct": desktop, "MobileUsagePct": mobile, "OtherUsagePct": other, "ConfLevel": [conf], "AggInterval": [agg_interval], "NormalizationType": [norm], "LastUpdated": [last_upd], } ) def get_device_usage_data(date_of_interest, auth_token): """Call API and retrieve device usage data and save both errors & results to GCS.""" # Calculate start date and end date logical_dag_dt = date_of_interest logical_dag_dt_as_date = datetime.strptime(logical_dag_dt, "%Y-%m-%d").date() start_date = logical_dag_dt_as_date - timedelta(days=4) end_date = start_date + timedelta(days=1) print("Start Date: ", start_date) print("End Date: ", end_date) # Configure request headers bearer_string = f"Bearer {auth_token}" headers = {"Authorization": bearer_string} # Initialize the empty results & errors dataframe results_df = pd.DataFrame( { "Timestamp": [], "UserType": [], "Location": [], "DesktopUsagePct": [], "MobileUsagePct": [], "OtherUsagePct": [], "ConfLevel": [], "AggInterval": [], "NormalizationType": [], "LastUpdated": [], } ) errors_df = pd.DataFrame({"StartTime": [], "EndTime": [], "Location": []}) # For each location, call the API to get device usage data for loc in device_usg_configs["locations"]: print("Loc: ", loc) # Generate the URL device_usage_api_url = generate_device_type_timeseries_api_call( start_date, end_date, "1d", loc ) try: # Call the API and save the response as JSON response = requests.get( device_usage_api_url, headers=headers, timeout=device_usg_configs["timeout_limit"], ) response_json = json.loads(response.text) # If response was successful, get the result if response_json["success"] is True: result = response_json["result"] human_ts, human_dsktp, human_mbl, human_othr = ( parse_device_type_timeseries_response_human(result) ) bot_ts, bot_dsktp, bot_mbl, bot_othr = ( parse_device_type_timeseries_response_bot(result) ) conf_lvl = result["meta"]["confidenceInfo"]["level"] aggr_intvl = result["meta"]["aggInterval"] nrmlztn = result["meta"]["normalization"] lst_upd = result["meta"]["lastUpdated"] # Save to the results dataframe ### FIX BELOW HERE #### human_result_df = make_device_usage_result_df( "Human", human_dsktp, human_mbl, human_othr, human_ts, lst_upd, nrmlztn, conf_lvl, aggr_intvl, loc, ) bot_result_df = make_device_usage_result_df( "Bot", bot_dsktp, bot_mbl, bot_othr, bot_ts, lst_upd, nrmlztn, conf_lvl, aggr_intvl, loc, ) # Union the results new_result_df = pd.concat( [human_result_df, bot_result_df], ignore_index=True, sort=False ) # Add results to the results dataframe results_df = pd.concat([results_df, new_result_df]) # If response was not successful, save to the errors dataframe else: new_errors_df = pd.DataFrame( { "StartTime": [start_date], "EndTime": [end_date], "Location": [loc], } ) errors_df = pd.concat([errors_df, new_errors_df]) except: new_errors_df = pd.DataFrame( {"StartTime": [start_date], "EndTime": [end_date], "Location": [loc]} ) errors_df = pd.concat([errors_df, new_errors_df]) # LOAD RESULTS & ERRORS TO STAGING GCS result_fpath = device_usg_configs["bucket"] + device_usg_configs[ "results_stg_gcs_fpth" ] % (start_date, logical_dag_dt) error_fpath = device_usg_configs["bucket"] + device_usg_configs[ "errors_stg_gcs_fpth" ] % (start_date, logical_dag_dt) results_df.to_csv(result_fpath, index=False) errors_df.to_csv(error_fpath, index=False) print("Wrote errors to: ", error_fpath) print("Wrote results to: ", result_fpath) # Print a summary to the console len_results = str(len(results_df)) len_errors = str(len(errors_df)) results_summary = [len_results, len_errors] return results_summary def main(): """Call the API, save data to GCS, load to BQ staging, delete & load to BQ gold""" parser = ArgumentParser(description=__doc__) parser.add_argument("--date", required=True) parser.add_argument("--cloudflare_api_token", default=cloudflare_api_token) parser.add_argument("--project", default="moz-fx-data-shared-prod") parser.add_argument("--dataset", default="cloudflare_derived") args = parser.parse_args() print("Running for date: ") print(args.date) # STEP 1 - Pull the data from the API, save results & errors to GCS staging area results_summary = get_device_usage_data(args.date, args.cloudflare_api_token) nbr_successful = results_summary[0] nbr_errors = results_summary[1] result_summary = f"# Result Rows: {nbr_successful}; # of Error Rows: {nbr_errors}" print("result_summary") print(result_summary) # Create a bigquery client client = bigquery.Client(args.project) result_uri = device_usg_configs["bucket"] + device_usg_configs[ "results_stg_gcs_fpth" ] % (datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4), args.date) error_uri = device_usg_configs["bucket"] + device_usg_configs[ "errors_stg_gcs_fpth" ] % (datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4), args.date) print("result_uri") print(result_uri) print("error_uri") print(error_uri) # STEP 2 - Copy the result data from GCS staging to BQ staging table load_result_csv_to_bq_stg_job = client.load_table_from_uri( result_uri, device_usg_configs["results_bq_stg_table"], job_config=bigquery.LoadJobConfig( create_disposition="CREATE_IF_NEEDED", write_disposition="WRITE_TRUNCATE", schema=[ {"name": "StartTime", "type": "TIMESTAMP", "mode": "REQUIRED"}, {"name": "UserType", "type": "STRING", "mode": "NULLABLE"}, {"name": "Location", "type": "STRING", "mode": "NULLABLE"}, {"name": "DesktopUsagePct", "type": "NUMERIC", "mode": "NULLABLE"}, {"name": "MobileUsagePct", "type": "NUMERIC", "mode": "NULLABLE"}, {"name": "OtherUsagePct", "type": "NUMERIC", "mode": "NULLABLE"}, {"name": "ConfLevel", "type": "STRING", "mode": "NULLABLE"}, {"name": "AggInterval", "type": "STRING", "mode": "NULLABLE"}, {"name": "NormalizationType", "type": "STRING", "mode": "NULLABLE"}, {"name": "LastUpdated", "type": "TIMESTAMP", "mode": "NULLABLE"}, ], skip_leading_rows=1, source_format=bigquery.SourceFormat.CSV, ), ) load_result_csv_to_bq_stg_job.result() result_bq_stg_tbl = client.get_table(device_usg_configs["results_bq_stg_table"]) print("Loaded {} rows to results staging.".format(result_bq_stg_tbl.num_rows)) # STEP 3 - Copy the error data from GCS staging to BQ staging table load_error_csv_to_bq_stg_job = client.load_table_from_uri( error_uri, device_usg_configs["errors_bq_stg_table"], job_config=bigquery.LoadJobConfig( create_disposition="CREATE_IF_NEEDED", write_disposition="WRITE_TRUNCATE", schema=[ {"name": "StartTime", "type": "DATE", "mode": "REQUIRED"}, {"name": "EndTime", "type": "DATE", "mode": "REQUIRED"}, {"name": "Location", "type": "STRING", "mode": "NULLABLE"}, ], skip_leading_rows=1, source_format=bigquery.SourceFormat.CSV, ), ) load_error_csv_to_bq_stg_job.result() error_bq_stg_tbl = client.get_table(device_usg_configs["errors_bq_stg_table"]) print("Loaded {} rows to errors staging.".format(error_bq_stg_tbl.num_rows)) # STEP 4 - Delete results from gold for this day, if there are any already (so if rerun, no issues will occur) del_exstng_gold_res_for_date = f"""DELETE FROM `moz-fx-data-shared-prod.cloudflare_derived.device_usage_v1` WHERE dte = DATE_SUB('{args.date}', INTERVAL 4 DAY) """ del_gold_res_job = client.query(del_exstng_gold_res_for_date) del_gold_res_job.result() print("Deleted anything already existing for this date from results gold") # STEP 5 - Delete errors from gold for this day, if there are any already (so if rerun, no issues will occur) del_exstng_gold_err_for_date = f"""DELETE FROM `moz-fx-data-shared-prod.cloudflare_derived.device_usage_errors_v1` WHERE dte = DATE_SUB('{args.date}', INTERVAL 4 DAY) """ del_gold_err_job = client.query(del_exstng_gold_err_for_date) del_gold_err_job.result() print("Deleted anything already existing for this date from errors gold") # STEP 6 - Load results from stage to gold device_usg_stg_to_gold_query = f""" INSERT INTO `moz-fx-data-shared-prod.cloudflare_derived.device_usage_v1` SELECT CAST(StartTime AS DATE) AS dte, UserType AS user_type, Location AS location, DesktopUsagePct AS desktop_usage_pct, MobileUsagePct AS mobile_usage_pct, OtherUsagePct AS other_usage_pct, AggInterval AS aggregation_interval, NormalizationType AS normalization_type, LastUpdated AS last_updated_ts FROM `moz-fx-data-shared-prod.cloudflare_derived.device_results_stg` WHERE CAST(StartTime as date) = DATE_SUB('{args.date}', INTERVAL 4 DAY) """ load_res_to_gold = client.query(device_usg_stg_to_gold_query) load_res_to_gold.result() # STEP 7 - Load errors from stage to gold device_usg_errors_stg_to_gold_query = f""" INSERT INTO `moz-fx-data-shared-prod.cloudflare_derived.device_usage_errors_v1` SELECT CAST(StartTime as date) AS dte, Location AS location FROM `moz-fx-data-shared-prod.cloudflare_derived.device_errors_stg` WHERE CAST(StartTime as date) = DATE_SUB('{args.date}', INTERVAL 4 DAY) """ load_err_to_gold = client.query(device_usg_errors_stg_to_gold_query) load_err_to_gold.result() # STEP 8 - Copy the result CSV from stage to archive, then delete from stage # Calculate the fpaths we will use ahead of time result_stg_fpath = device_usg_configs["results_stg_gcs_fpth"] % ( datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4), args.date, ) result_archive_fpath = device_usg_configs["results_archive_gcs_fpath"] % ( datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4), args.date, ) move_blob( device_usg_configs["bucket_no_gs"], result_stg_fpath, device_usg_configs["bucket_no_gs"], result_archive_fpath, ) # STEP 9 - Copy the error CSV from stage to archive, then delete from stage error_stg_fpath = device_usg_configs["errors_stg_gcs_fpth"] % ( datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4), args.date, ) error_archive_fpath = device_usg_configs["errors_archive_gcs_fpath"] % ( datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4), args.date, ) move_blob( device_usg_configs["bucket_no_gs"], error_stg_fpath, device_usg_configs["bucket_no_gs"], error_archive_fpath, ) # Lastly, if # errors > 4, fail with error if int(nbr_errors) > 4: raise Exception("5 or more errors, check for issues") if __name__ == "__main__": main()