def main()

in sql/moz-fx-data-shared-prod/cloudflare_derived/browser_usage_v1/query.py [0:0]


def main():
    """Call the API, save data to GCS, load to BQ staging, delete & load to BQ gold"""
    parser = ArgumentParser(description=__doc__)
    parser.add_argument("--date", required=True)
    parser.add_argument("--cloudflare_api_token", default=cloudflare_api_token)
    parser.add_argument("--project", default=brwsr_usg_configs["gcp_project_id"])
    parser.add_argument("--dataset", default="cloudflare_derived")

    args = parser.parse_args()
    print("Running for date: ")
    print(args.date)

    # STEP 1 - Pull the data from the API, save results & errors to GCS staging area
    results = get_browser_data(args.date, args.cloudflare_api_token)
    nbr_successful = results[0]
    nbr_errors = results[1]
    result_summary = f"# Result Rows: {nbr_successful}; # of Error Rows: {nbr_errors}"
    print("result_summary")
    print(result_summary)

    # Create a bigquery client
    client = bigquery.Client(args.project)

    result_uri = brwsr_usg_configs["bucket"] + brwsr_usg_configs[
        "results_stg_gcs_fpth"
    ] % (datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4), args.date)
    error_uri = brwsr_usg_configs["bucket"] + brwsr_usg_configs[
        "errors_stg_gcs_fpth"
    ] % (datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4), args.date)
    print("result_uri")
    print(result_uri)

    print("error_uri")
    print(error_uri)

    # STEP 2 - Copy the result data from GCS staging to BQ staging table
    load_result_csv_to_bq_stg_job = client.load_table_from_uri(
        result_uri,
        brwsr_usg_configs["results_bq_stg_table"],
        job_config=bigquery.LoadJobConfig(
            create_disposition="CREATE_IF_NEEDED",
            write_disposition="WRITE_TRUNCATE",
            schema=[
                {"name": "StartTime", "type": "TIMESTAMP", "mode": "REQUIRED"},
                {"name": "EndTime", "type": "TIMESTAMP", "mode": "REQUIRED"},
                {"name": "DeviceType", "type": "STRING", "mode": "NULLABLE"},
                {"name": "Location", "type": "STRING", "mode": "NULLABLE"},
                {"name": "UserType", "type": "STRING", "mode": "NULLABLE"},
                {"name": "Browser", "type": "STRING", "mode": "NULLABLE"},
                {"name": "OperatingSystem", "type": "STRING", "mode": "NULLABLE"},
                {"name": "PercentShare", "type": "NUMERIC", "mode": "NULLABLE"},
                {"name": "ConfLevel", "type": "STRING", "mode": "NULLABLE"},
                {"name": "Normalization", "type": "STRING", "mode": "NULLABLE"},
                {"name": "LastUpdated", "type": "TIMESTAMP", "mode": "NULLABLE"},
            ],
            skip_leading_rows=1,
            source_format=bigquery.SourceFormat.CSV,
        ),
    )
    load_result_csv_to_bq_stg_job.result()
    result_bq_stg_tbl = client.get_table(brwsr_usg_configs["results_bq_stg_table"])
    print("Loaded {} rows to results staging.".format(result_bq_stg_tbl.num_rows))

    # STEP 3 - Copy the error data from GCS staging to BQ staging table
    load_error_csv_to_bq_stg_job = client.load_table_from_uri(
        error_uri,
        brwsr_usg_configs["errors_bq_stg_table"],
        job_config=bigquery.LoadJobConfig(
            create_disposition="CREATE_IF_NEEDED",
            write_disposition="WRITE_TRUNCATE",
            schema=[
                {"name": "StartTime", "type": "DATE", "mode": "REQUIRED"},
                {"name": "EndTime", "type": "DATE", "mode": "REQUIRED"},
                {"name": "Location", "type": "STRING", "mode": "NULLABLE"},
                {"name": "UserType", "type": "STRING", "mode": "NULLABLE"},
                {"name": "DeviceType", "type": "STRING", "mode": "NULLABLE"},
                {"name": "OperatingSystem", "type": "STRING", "mode": "NULLABLE"},
            ],
            skip_leading_rows=1,
            source_format=bigquery.SourceFormat.CSV,
        ),
    )

    load_error_csv_to_bq_stg_job.result()
    error_bq_stg_tbl = client.get_table(brwsr_usg_configs["errors_bq_stg_table"])
    print("Loaded {} rows to errors staging.".format(error_bq_stg_tbl.num_rows))

    # STEP 4 - Delete results from gold for this day, if there are any already (so if rerun, no issues will occur)
    del_exstng_gold_res_for_date = f"""DELETE FROM `moz-fx-data-shared-prod.cloudflare_derived.browser_usage_v1` WHERE dte = DATE_SUB('{args.date}', INTERVAL 4 DAY)  """
    del_gold_res_job = client.query(del_exstng_gold_res_for_date)
    del_gold_res_job.result()
    print("Deleted anything already existing for this date from results gold")

    # STEP 5 - Delete errors from gold for this day, if there are any already (so if rerun, no issues will occur)
    del_exstng_gold_err_for_date = f"""DELETE FROM `moz-fx-data-shared-prod.cloudflare_derived.browser_usage_errors_v1` WHERE dte = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
    del_gold_err_job = client.query(del_exstng_gold_err_for_date)
    del_gold_err_job.result()
    print("Deleted anything already existing for this date from errors gold")

    # STEP 6 - Load results from stage to gold
    browser_usg_stg_to_gold_query = f""" INSERT INTO `moz-fx-data-shared-prod.cloudflare_derived.browser_usage_v1`
SELECT
CAST(StartTime as date) AS dte,
DeviceType AS device_type,
Location AS location,
UserType AS user_type,
Browser AS browser,
OperatingSystem AS operating_system,
PercentShare AS percent_share,
Normalization AS normalization,
LastUpdated AS last_updated_ts
FROM `moz-fx-data-shared-prod.cloudflare_derived.browser_results_stg`
WHERE CAST(StartTime as date) = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
    load_res_to_gold = client.query(browser_usg_stg_to_gold_query)
    load_res_to_gold.result()

    # STEP 7 - Load errors from stage to gold
    browser_usg_errors_stg_to_gold_query = f""" INSERT INTO `moz-fx-data-shared-prod.cloudflare_derived.browser_usage_errors_v1`
SELECT
CAST(StartTime as date) AS dte,
Location AS location,
UserType AS user_type,
DeviceType AS device_type,
OperatingSystem AS operating_system
FROM `moz-fx-data-shared-prod.cloudflare_derived.browser_errors_stg`
WHERE CAST(StartTime as date) = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
    load_err_to_gold = client.query(browser_usg_errors_stg_to_gold_query)
    load_err_to_gold.result()

    # STEP 8 - Copy the result CSV from stage to archive, then delete from stage

    # Calculate the fpaths we will use ahead of time
    result_stg_fpath = brwsr_usg_configs["results_stg_gcs_fpth"] % (
        datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
        args.date,
    )
    result_archive_fpath = brwsr_usg_configs["results_archive_gcs_fpath"] % (
        datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
        args.date,
    )
    move_blob(
        brwsr_usg_configs["bucket_no_gs"],
        result_stg_fpath,
        brwsr_usg_configs["bucket_no_gs"],
        result_archive_fpath,
    )

    # STEP 9 - Copy the error CSV from stage to archive, then delete from stage
    error_stg_fpath = brwsr_usg_configs["errors_stg_gcs_fpth"] % (
        datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
        args.date,
    )
    error_archive_fpath = brwsr_usg_configs["errors_archive_gcs_fpath"] % (
        datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
        args.date,
    )
    move_blob(
        brwsr_usg_configs["bucket_no_gs"],
        error_stg_fpath,
        brwsr_usg_configs["bucket_no_gs"],
        error_archive_fpath,
    )

    #If # errors > 200 (more than 10%), fail with error
    if int(nbr_errors) > 200:
        raise Exception("200 or more errors, check for issues")