def main()

in sql/moz-fx-data-shared-prod/cloudflare_derived/os_usage_v1/query.py [0:0]


def main():
    """Call the API, save data to GCS, load to BQ staging, delete & load to BQ gold"""
    parser = ArgumentParser(description=__doc__)
    parser.add_argument("--date", required=True)
    parser.add_argument("--cloudflare_api_token", default=cloudflare_api_token)
    parser.add_argument("--project", default="moz-fx-data-shared-prod")
    parser.add_argument("--dataset", default="cloudflare_derived")

    args = parser.parse_args()
    print("Running for date: ")
    print(args.date)

    # STEP 1 - Pull the data from the API, save results & errors to GCS staging area
    result_summary = get_os_usage_data(args.date, args.cloudflare_api_token)
    print("result_summary")
    print(result_summary)

    # Create a bigquery client
    client = bigquery.Client(args.project)

    result_uri = os_usg_configs["bucket"] + os_usg_configs["results_stg_gcs_fpth"] % (
        datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
        args.date,
    )
    error_uri = os_usg_configs["bucket"] + os_usg_configs["errors_stg_gcs_fpth"] % (
        datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
        args.date,
    )
    print("result_uri")
    print(result_uri)

    print("error_uri")
    print(error_uri)

    # STEP 2 - Copy the result data from GCS staging to BQ staging table
    load_result_csv_to_bq_stg_job = client.load_table_from_uri(
        result_uri,
        os_usg_configs["results_bq_stg_table"],
        job_config=bigquery.LoadJobConfig(
            create_disposition="CREATE_IF_NEEDED",
            write_disposition="WRITE_TRUNCATE",
            schema=[
                {"name": "Timestamps", "type": "TIMESTAMP", "mode": "REQUIRED"},
                {"name": "OS", "type": "STRING", "mode": "NULLABLE"},
                {"name": "Location", "type": "STRING", "mode": "NULLABLE"},
                {"name": "DeviceType", "type": "STRING", "mode": "NULLABLE"},
                {"name": "Share", "type": "NUMERIC", "mode": "NULLABLE"},
                {"name": "ConfidenceLevel", "type": "STRING", "mode": "NULLABLE"},
                {"name": "AggrInterval", "type": "STRING", "mode": "NULLABLE"},
                {"name": "Normalization", "type": "STRING", "mode": "NULLABLE"},
                {"name": "LastUpdatedTS", "type": "TIMESTAMP", "mode": "NULLABLE"},
            ],
            skip_leading_rows=1,
            source_format=bigquery.SourceFormat.CSV,
        ),
    )

    load_result_csv_to_bq_stg_job.result()
    result_bq_stg_tbl = client.get_table(os_usg_configs["results_bq_stg_table"])
    print("Loaded {} rows to results staging.".format(result_bq_stg_tbl.num_rows))

    # STEP 3 - Copy the error data from GCS staging to BQ staging table
    load_error_csv_to_bq_stg_job = client.load_table_from_uri(
        error_uri,
        os_usg_configs["errors_bq_stg_table"],
        job_config=bigquery.LoadJobConfig(
            create_disposition="CREATE_IF_NEEDED",
            write_disposition="WRITE_TRUNCATE",
            schema=[
                {"name": "StartDate", "type": "DATE", "mode": "REQUIRED"},
                {"name": "EndDate", "type": "DATE", "mode": "REQUIRED"},
                {"name": "Location", "type": "STRING", "mode": "NULLABLE"},
                {"name": "DeviceType", "type": "STRING", "mode": "NULLABLE"},
            ],
            skip_leading_rows=1,
            source_format=bigquery.SourceFormat.CSV,
        ),
    )

    load_error_csv_to_bq_stg_job.result()
    error_bq_stg_tbl = client.get_table(os_usg_configs["errors_bq_stg_table"])
    print("Loaded {} rows to errors staging.".format(error_bq_stg_tbl.num_rows))

    # STEP 4 - Delete results from gold for this day, if there are any already (so if rerun, no issues will occur)
    del_exstng_gold_res_for_date = f"""DELETE FROM `moz-fx-data-shared-prod.cloudflare_derived.os_usage_v1` WHERE dte = DATE_SUB('{args.date}', INTERVAL 4 DAY)  """
    del_gold_res_job = client.query(del_exstng_gold_res_for_date)
    del_gold_res_job.result()
    print("Deleted anything already existing for this date from results gold")

    # STEP 5 - Delete errors from gold for this day, if there are any already (so if rerun, no issues will occur)
    del_exstng_gold_err_for_date = f"""DELETE FROM `moz-fx-data-shared-prod.cloudflare_derived.os_usage_errors_v1` WHERE dte = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
    del_gold_err_job = client.query(del_exstng_gold_err_for_date)
    del_gold_err_job.result()
    print("Deleted anything already existing for this date from errors gold")

    # STEP 6 - Load results from stage to gold # NEED TO UPDATE THIS STILL
    os_usg_stg_to_gold_query = f""" INSERT INTO `moz-fx-data-shared-prod.cloudflare_derived.os_usage_v1`
SELECT 
CAST(Timestamps AS date) AS dte,
OS AS os,
Location AS location,
DeviceType AS device_type,
Share AS os_share,
Normalization AS normalization_type,
LastUpdatedTS AS last_updated_ts
FROM `moz-fx-data-shared-prod.cloudflare_derived.os_results_stg`
WHERE CAST(Timestamps as date) = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
    load_res_to_gold = client.query(os_usg_stg_to_gold_query)
    load_res_to_gold.result()

    # STEP 7 - Load errors from stage to gold
    os_usg_errors_stg_to_gold_query = f""" INSERT INTO `moz-fx-data-shared-prod.cloudflare_derived.os_usage_errors_v1`
SELECT
StartDate AS dte,
Location AS location,
DeviceType AS device_type
FROM `moz-fx-data-shared-prod.cloudflare_derived.os_errors_stg`
WHERE StartDate = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
    load_err_to_gold = client.query(os_usg_errors_stg_to_gold_query)
    load_err_to_gold.result()

    # STEP 8 - Copy the result CSV from stage to archive, then delete from stage
    # Calculate the fpaths we will use ahead of time
    result_stg_fpath = os_usg_configs["results_stg_gcs_fpth"] % (
        datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
        args.date,
    )
    result_archive_fpath = os_usg_configs["results_archive_gcs_fpth"] % (
        datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
        args.date,
    )
    move_blob(
        os_usg_configs["bucket_no_gs"],
        result_stg_fpath,
        os_usg_configs["bucket_no_gs"],
        result_archive_fpath,
    )

    # STEP 9 - Copy the error CSV from stage to archive, then delete from stage
    error_stg_fpath = os_usg_configs["errors_stg_gcs_fpth"] % (
        datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
        args.date,
    )
    error_archive_fpath = os_usg_configs["errors_archive_gcs_fpth"] % (
        datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
        args.date,
    )
    move_blob(
        os_usg_configs["bucket_no_gs"],
        error_stg_fpath,
        os_usg_configs["bucket_no_gs"],
        error_archive_fpath,
    )