in sql/moz-fx-data-shared-prod/cloudflare_derived/browser_usage_v1/query.py [0:0]
def main():
"""Call the API, save data to GCS, load to BQ staging, delete & load to BQ gold"""
parser = ArgumentParser(description=__doc__)
parser.add_argument("--date", required=True)
parser.add_argument("--cloudflare_api_token", default=cloudflare_api_token)
parser.add_argument("--project", default=brwsr_usg_configs["gcp_project_id"])
parser.add_argument("--dataset", default="cloudflare_derived")
args = parser.parse_args()
print("Running for date: ")
print(args.date)
# STEP 1 - Pull the data from the API, save results & errors to GCS staging area
results = get_browser_data(args.date, args.cloudflare_api_token)
nbr_successful = results[0]
nbr_errors = results[1]
result_summary = f"# Result Rows: {nbr_successful}; # of Error Rows: {nbr_errors}"
print("result_summary")
print(result_summary)
# Create a bigquery client
client = bigquery.Client(args.project)
result_uri = brwsr_usg_configs["bucket"] + brwsr_usg_configs[
"results_stg_gcs_fpth"
] % (datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4), args.date)
error_uri = brwsr_usg_configs["bucket"] + brwsr_usg_configs[
"errors_stg_gcs_fpth"
] % (datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4), args.date)
print("result_uri")
print(result_uri)
print("error_uri")
print(error_uri)
# STEP 2 - Copy the result data from GCS staging to BQ staging table
load_result_csv_to_bq_stg_job = client.load_table_from_uri(
result_uri,
brwsr_usg_configs["results_bq_stg_table"],
job_config=bigquery.LoadJobConfig(
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
schema=[
{"name": "StartTime", "type": "TIMESTAMP", "mode": "REQUIRED"},
{"name": "EndTime", "type": "TIMESTAMP", "mode": "REQUIRED"},
{"name": "DeviceType", "type": "STRING", "mode": "NULLABLE"},
{"name": "Location", "type": "STRING", "mode": "NULLABLE"},
{"name": "UserType", "type": "STRING", "mode": "NULLABLE"},
{"name": "Browser", "type": "STRING", "mode": "NULLABLE"},
{"name": "OperatingSystem", "type": "STRING", "mode": "NULLABLE"},
{"name": "PercentShare", "type": "NUMERIC", "mode": "NULLABLE"},
{"name": "ConfLevel", "type": "STRING", "mode": "NULLABLE"},
{"name": "Normalization", "type": "STRING", "mode": "NULLABLE"},
{"name": "LastUpdated", "type": "TIMESTAMP", "mode": "NULLABLE"},
],
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
),
)
load_result_csv_to_bq_stg_job.result()
result_bq_stg_tbl = client.get_table(brwsr_usg_configs["results_bq_stg_table"])
print("Loaded {} rows to results staging.".format(result_bq_stg_tbl.num_rows))
# STEP 3 - Copy the error data from GCS staging to BQ staging table
load_error_csv_to_bq_stg_job = client.load_table_from_uri(
error_uri,
brwsr_usg_configs["errors_bq_stg_table"],
job_config=bigquery.LoadJobConfig(
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
schema=[
{"name": "StartTime", "type": "DATE", "mode": "REQUIRED"},
{"name": "EndTime", "type": "DATE", "mode": "REQUIRED"},
{"name": "Location", "type": "STRING", "mode": "NULLABLE"},
{"name": "UserType", "type": "STRING", "mode": "NULLABLE"},
{"name": "DeviceType", "type": "STRING", "mode": "NULLABLE"},
{"name": "OperatingSystem", "type": "STRING", "mode": "NULLABLE"},
],
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
),
)
load_error_csv_to_bq_stg_job.result()
error_bq_stg_tbl = client.get_table(brwsr_usg_configs["errors_bq_stg_table"])
print("Loaded {} rows to errors staging.".format(error_bq_stg_tbl.num_rows))
# STEP 4 - Delete results from gold for this day, if there are any already (so if rerun, no issues will occur)
del_exstng_gold_res_for_date = f"""DELETE FROM `moz-fx-data-shared-prod.cloudflare_derived.browser_usage_v1` WHERE dte = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
del_gold_res_job = client.query(del_exstng_gold_res_for_date)
del_gold_res_job.result()
print("Deleted anything already existing for this date from results gold")
# STEP 5 - Delete errors from gold for this day, if there are any already (so if rerun, no issues will occur)
del_exstng_gold_err_for_date = f"""DELETE FROM `moz-fx-data-shared-prod.cloudflare_derived.browser_usage_errors_v1` WHERE dte = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
del_gold_err_job = client.query(del_exstng_gold_err_for_date)
del_gold_err_job.result()
print("Deleted anything already existing for this date from errors gold")
# STEP 6 - Load results from stage to gold
browser_usg_stg_to_gold_query = f""" INSERT INTO `moz-fx-data-shared-prod.cloudflare_derived.browser_usage_v1`
SELECT
CAST(StartTime as date) AS dte,
DeviceType AS device_type,
Location AS location,
UserType AS user_type,
Browser AS browser,
OperatingSystem AS operating_system,
PercentShare AS percent_share,
Normalization AS normalization,
LastUpdated AS last_updated_ts
FROM `moz-fx-data-shared-prod.cloudflare_derived.browser_results_stg`
WHERE CAST(StartTime as date) = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
load_res_to_gold = client.query(browser_usg_stg_to_gold_query)
load_res_to_gold.result()
# STEP 7 - Load errors from stage to gold
browser_usg_errors_stg_to_gold_query = f""" INSERT INTO `moz-fx-data-shared-prod.cloudflare_derived.browser_usage_errors_v1`
SELECT
CAST(StartTime as date) AS dte,
Location AS location,
UserType AS user_type,
DeviceType AS device_type,
OperatingSystem AS operating_system
FROM `moz-fx-data-shared-prod.cloudflare_derived.browser_errors_stg`
WHERE CAST(StartTime as date) = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
load_err_to_gold = client.query(browser_usg_errors_stg_to_gold_query)
load_err_to_gold.result()
# STEP 8 - Copy the result CSV from stage to archive, then delete from stage
# Calculate the fpaths we will use ahead of time
result_stg_fpath = brwsr_usg_configs["results_stg_gcs_fpth"] % (
datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
args.date,
)
result_archive_fpath = brwsr_usg_configs["results_archive_gcs_fpath"] % (
datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
args.date,
)
move_blob(
brwsr_usg_configs["bucket_no_gs"],
result_stg_fpath,
brwsr_usg_configs["bucket_no_gs"],
result_archive_fpath,
)
# STEP 9 - Copy the error CSV from stage to archive, then delete from stage
error_stg_fpath = brwsr_usg_configs["errors_stg_gcs_fpth"] % (
datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
args.date,
)
error_archive_fpath = brwsr_usg_configs["errors_archive_gcs_fpath"] % (
datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
args.date,
)
move_blob(
brwsr_usg_configs["bucket_no_gs"],
error_stg_fpath,
brwsr_usg_configs["bucket_no_gs"],
error_archive_fpath,
)
#If # errors > 200 (more than 10%), fail with error
if int(nbr_errors) > 200:
raise Exception("200 or more errors, check for issues")