in sql/moz-fx-data-shared-prod/cloudflare_derived/os_usage_v1/query.py [0:0]
def main():
"""Call the API, save data to GCS, load to BQ staging, delete & load to BQ gold"""
parser = ArgumentParser(description=__doc__)
parser.add_argument("--date", required=True)
parser.add_argument("--cloudflare_api_token", default=cloudflare_api_token)
parser.add_argument("--project", default="moz-fx-data-shared-prod")
parser.add_argument("--dataset", default="cloudflare_derived")
args = parser.parse_args()
print("Running for date: ")
print(args.date)
# STEP 1 - Pull the data from the API, save results & errors to GCS staging area
result_summary = get_os_usage_data(args.date, args.cloudflare_api_token)
print("result_summary")
print(result_summary)
# Create a bigquery client
client = bigquery.Client(args.project)
result_uri = os_usg_configs["bucket"] + os_usg_configs["results_stg_gcs_fpth"] % (
datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
args.date,
)
error_uri = os_usg_configs["bucket"] + os_usg_configs["errors_stg_gcs_fpth"] % (
datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
args.date,
)
print("result_uri")
print(result_uri)
print("error_uri")
print(error_uri)
# STEP 2 - Copy the result data from GCS staging to BQ staging table
load_result_csv_to_bq_stg_job = client.load_table_from_uri(
result_uri,
os_usg_configs["results_bq_stg_table"],
job_config=bigquery.LoadJobConfig(
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
schema=[
{"name": "Timestamps", "type": "TIMESTAMP", "mode": "REQUIRED"},
{"name": "OS", "type": "STRING", "mode": "NULLABLE"},
{"name": "Location", "type": "STRING", "mode": "NULLABLE"},
{"name": "DeviceType", "type": "STRING", "mode": "NULLABLE"},
{"name": "Share", "type": "NUMERIC", "mode": "NULLABLE"},
{"name": "ConfidenceLevel", "type": "STRING", "mode": "NULLABLE"},
{"name": "AggrInterval", "type": "STRING", "mode": "NULLABLE"},
{"name": "Normalization", "type": "STRING", "mode": "NULLABLE"},
{"name": "LastUpdatedTS", "type": "TIMESTAMP", "mode": "NULLABLE"},
],
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
),
)
load_result_csv_to_bq_stg_job.result()
result_bq_stg_tbl = client.get_table(os_usg_configs["results_bq_stg_table"])
print("Loaded {} rows to results staging.".format(result_bq_stg_tbl.num_rows))
# STEP 3 - Copy the error data from GCS staging to BQ staging table
load_error_csv_to_bq_stg_job = client.load_table_from_uri(
error_uri,
os_usg_configs["errors_bq_stg_table"],
job_config=bigquery.LoadJobConfig(
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
schema=[
{"name": "StartDate", "type": "DATE", "mode": "REQUIRED"},
{"name": "EndDate", "type": "DATE", "mode": "REQUIRED"},
{"name": "Location", "type": "STRING", "mode": "NULLABLE"},
{"name": "DeviceType", "type": "STRING", "mode": "NULLABLE"},
],
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
),
)
load_error_csv_to_bq_stg_job.result()
error_bq_stg_tbl = client.get_table(os_usg_configs["errors_bq_stg_table"])
print("Loaded {} rows to errors staging.".format(error_bq_stg_tbl.num_rows))
# STEP 4 - Delete results from gold for this day, if there are any already (so if rerun, no issues will occur)
del_exstng_gold_res_for_date = f"""DELETE FROM `moz-fx-data-shared-prod.cloudflare_derived.os_usage_v1` WHERE dte = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
del_gold_res_job = client.query(del_exstng_gold_res_for_date)
del_gold_res_job.result()
print("Deleted anything already existing for this date from results gold")
# STEP 5 - Delete errors from gold for this day, if there are any already (so if rerun, no issues will occur)
del_exstng_gold_err_for_date = f"""DELETE FROM `moz-fx-data-shared-prod.cloudflare_derived.os_usage_errors_v1` WHERE dte = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
del_gold_err_job = client.query(del_exstng_gold_err_for_date)
del_gold_err_job.result()
print("Deleted anything already existing for this date from errors gold")
# STEP 6 - Load results from stage to gold # NEED TO UPDATE THIS STILL
os_usg_stg_to_gold_query = f""" INSERT INTO `moz-fx-data-shared-prod.cloudflare_derived.os_usage_v1`
SELECT
CAST(Timestamps AS date) AS dte,
OS AS os,
Location AS location,
DeviceType AS device_type,
Share AS os_share,
Normalization AS normalization_type,
LastUpdatedTS AS last_updated_ts
FROM `moz-fx-data-shared-prod.cloudflare_derived.os_results_stg`
WHERE CAST(Timestamps as date) = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
load_res_to_gold = client.query(os_usg_stg_to_gold_query)
load_res_to_gold.result()
# STEP 7 - Load errors from stage to gold
os_usg_errors_stg_to_gold_query = f""" INSERT INTO `moz-fx-data-shared-prod.cloudflare_derived.os_usage_errors_v1`
SELECT
StartDate AS dte,
Location AS location,
DeviceType AS device_type
FROM `moz-fx-data-shared-prod.cloudflare_derived.os_errors_stg`
WHERE StartDate = DATE_SUB('{args.date}', INTERVAL 4 DAY) """
load_err_to_gold = client.query(os_usg_errors_stg_to_gold_query)
load_err_to_gold.result()
# STEP 8 - Copy the result CSV from stage to archive, then delete from stage
# Calculate the fpaths we will use ahead of time
result_stg_fpath = os_usg_configs["results_stg_gcs_fpth"] % (
datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
args.date,
)
result_archive_fpath = os_usg_configs["results_archive_gcs_fpth"] % (
datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
args.date,
)
move_blob(
os_usg_configs["bucket_no_gs"],
result_stg_fpath,
os_usg_configs["bucket_no_gs"],
result_archive_fpath,
)
# STEP 9 - Copy the error CSV from stage to archive, then delete from stage
error_stg_fpath = os_usg_configs["errors_stg_gcs_fpth"] % (
datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
args.date,
)
error_archive_fpath = os_usg_configs["errors_archive_gcs_fpth"] % (
datetime.strptime(args.date, "%Y-%m-%d").date() - timedelta(days=4),
args.date,
)
move_blob(
os_usg_configs["bucket_no_gs"],
error_stg_fpath,
os_usg_configs["bucket_no_gs"],
error_archive_fpath,
)