in sql/moz-fx-data-shared-prod/cloudflare_derived/device_usage_v1/query.py [0:0]
def main():
"""Call the API, save data to GCS, load to BQ staging, delete & load to BQ gold"""
parser = ArgumentParser(description=__doc__)
parser.add_argument("--date", required=True)
parser.add_argument("--cloudflare_api_token", default=cloudflare_api_token)
parser.add_argument("--project", default="moz-fx-data-shared-prod")
parser.add_argument("--dataset", default="cloudflare_derived")
args = parser.parse_args()
print("Running for date: ")
print(args.date)
# STEP 1 - Pull the data from the API, save results & errors to GCS staging area
results_summary = get_device_usage_data(args.date, args.cloudflare_api_token)
nbr_successful = results_summary[0]
nbr_errors = results_summary[1]
result_summary = f"# Result Rows: {nbr_successful}; # of Error Rows: {nbr_errors}"
print("result_summary")
print(result_summary)
# Create a bigquery client
client = bigquery.Client(args.project)
result_uri = device_usg_configs["bucket"] + device_usg_configs[
"results_stg_gcs_fpth"
] % (datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4), args.date)
error_uri = device_usg_configs["bucket"] + device_usg_configs[
"errors_stg_gcs_fpth"
] % (datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4), args.date)
print("result_uri")
print(result_uri)
print("error_uri")
print(error_uri)
# STEP 2 - Copy the result data from GCS staging to BQ staging table
load_result_csv_to_bq_stg_job = client.load_table_from_uri(
result_uri,
device_usg_configs["results_bq_stg_table"],
job_config=bigquery.LoadJobConfig(
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
schema=[
{"name": "StartTime", "type": "TIMESTAMP", "mode": "REQUIRED"},
{"name": "UserType", "type": "STRING", "mode": "NULLABLE"},
{"name": "Location", "type": "STRING", "mode": "NULLABLE"},
{"name": "DesktopUsagePct", "type": "NUMERIC", "mode": "NULLABLE"},
{"name": "MobileUsagePct", "type": "NUMERIC", "mode": "NULLABLE"},
{"name": "OtherUsagePct", "type": "NUMERIC", "mode": "NULLABLE"},
{"name": "ConfLevel", "type": "STRING", "mode": "NULLABLE"},
{"name": "AggInterval", "type": "STRING", "mode": "NULLABLE"},
{"name": "NormalizationType", "type": "STRING", "mode": "NULLABLE"},
{"name": "LastUpdated", "type": "TIMESTAMP", "mode": "NULLABLE"},
],
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
),
)
load_result_csv_to_bq_stg_job.result()
result_bq_stg_tbl = client.get_table(device_usg_configs["results_bq_stg_table"])
print("Loaded {} rows to results staging.".format(result_bq_stg_tbl.num_rows))
# STEP 3 - Copy the error data from GCS staging to BQ staging table
load_error_csv_to_bq_stg_job = client.load_table_from_uri(
error_uri,
device_usg_configs["errors_bq_stg_table"],
job_config=bigquery.LoadJobConfig(
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
schema=[
{"name": "StartTime", "type": "DATE", "mode": "REQUIRED"},
{"name": "EndTime", "type": "DATE", "mode": "REQUIRED"},
{"name": "Location", "type": "STRING", "mode": "NULLABLE"},
],
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
),
)
load_error_csv_to_bq_stg_job.result()
error_bq_stg_tbl = client.get_table(device_usg_configs["errors_bq_stg_table"])
print("Loaded {} rows to errors staging.".format(error_bq_stg_tbl.num_rows))
# STEP 4 - Delete results from gold for this day, if there are any already (so if rerun, no issues will occur)
del_exstng_gold_res_for_date = f"""DELETE FROM `moz-fx-data-shared-prod.cloudflare_derived.device_usage_v1` WHERE dte = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
del_gold_res_job = client.query(del_exstng_gold_res_for_date)
del_gold_res_job.result()
print("Deleted anything already existing for this date from results gold")
# STEP 5 - Delete errors from gold for this day, if there are any already (so if rerun, no issues will occur)
del_exstng_gold_err_for_date = f"""DELETE FROM `moz-fx-data-shared-prod.cloudflare_derived.device_usage_errors_v1` WHERE dte = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
del_gold_err_job = client.query(del_exstng_gold_err_for_date)
del_gold_err_job.result()
print("Deleted anything already existing for this date from errors gold")
# STEP 6 - Load results from stage to gold
device_usg_stg_to_gold_query = f""" INSERT INTO `moz-fx-data-shared-prod.cloudflare_derived.device_usage_v1`
SELECT
CAST(StartTime AS DATE) AS dte,
UserType AS user_type,
Location AS location,
DesktopUsagePct AS desktop_usage_pct,
MobileUsagePct AS mobile_usage_pct,
OtherUsagePct AS other_usage_pct,
AggInterval AS aggregation_interval,
NormalizationType AS normalization_type,
LastUpdated AS last_updated_ts
FROM `moz-fx-data-shared-prod.cloudflare_derived.device_results_stg`
WHERE CAST(StartTime as date) = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
load_res_to_gold = client.query(device_usg_stg_to_gold_query)
load_res_to_gold.result()
# STEP 7 - Load errors from stage to gold
device_usg_errors_stg_to_gold_query = f""" INSERT INTO `moz-fx-data-shared-prod.cloudflare_derived.device_usage_errors_v1`
SELECT
CAST(StartTime as date) AS dte,
Location AS location
FROM `moz-fx-data-shared-prod.cloudflare_derived.device_errors_stg`
WHERE CAST(StartTime as date) = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
load_err_to_gold = client.query(device_usg_errors_stg_to_gold_query)
load_err_to_gold.result()
# STEP 8 - Copy the result CSV from stage to archive, then delete from stage
# Calculate the fpaths we will use ahead of time
result_stg_fpath = device_usg_configs["results_stg_gcs_fpth"] % (
datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
args.date,
)
result_archive_fpath = device_usg_configs["results_archive_gcs_fpath"] % (
datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
args.date,
)
move_blob(
device_usg_configs["bucket_no_gs"],
result_stg_fpath,
device_usg_configs["bucket_no_gs"],
result_archive_fpath,
)
# STEP 9 - Copy the error CSV from stage to archive, then delete from stage
error_stg_fpath = device_usg_configs["errors_stg_gcs_fpth"] % (
datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
args.date,
)
error_archive_fpath = device_usg_configs["errors_archive_gcs_fpath"] % (
datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
args.date,
)
move_blob(
device_usg_configs["bucket_no_gs"],
error_stg_fpath,
device_usg_configs["bucket_no_gs"],
error_archive_fpath,
)
# Lastly, if # errors > 4, fail with error
if int(nbr_errors) > 4:
raise Exception("5 or more errors, check for issues")