in sql/moz-fx-data-shared-prod/cloudflare_derived/device_usage_v1/query.py [0:0]
def get_device_usage_data(date_of_interest, auth_token):
"""Call API and retrieve device usage data and save both errors & results to GCS."""
# Calculate start date and end date
logical_dag_dt = date_of_interest
logical_dag_dt_as_date = datetime.strptime(logical_dag_dt, "%Y-%m-%d").date()
start_date = logical_dag_dt_as_date - timedelta(days=4)
end_date = start_date + timedelta(days=1)
print("Start Date: ", start_date)
print("End Date: ", end_date)
# Configure request headers
bearer_string = f"Bearer {auth_token}"
headers = {"Authorization": bearer_string}
# Initialize the empty results & errors dataframe
results_df = pd.DataFrame(
{
"Timestamp": [],
"UserType": [],
"Location": [],
"DesktopUsagePct": [],
"MobileUsagePct": [],
"OtherUsagePct": [],
"ConfLevel": [],
"AggInterval": [],
"NormalizationType": [],
"LastUpdated": [],
}
)
errors_df = pd.DataFrame({"StartTime": [], "EndTime": [], "Location": []})
# For each location, call the API to get device usage data
for loc in device_usg_configs["locations"]:
print("Loc: ", loc)
# Generate the URL
device_usage_api_url = generate_device_type_timeseries_api_call(
start_date, end_date, "1d", loc
)
try:
# Call the API and save the response as JSON
response = requests.get(
device_usage_api_url,
headers=headers,
timeout=device_usg_configs["timeout_limit"],
)
response_json = json.loads(response.text)
# If response was successful, get the result
if response_json["success"] is True:
result = response_json["result"]
human_ts, human_dsktp, human_mbl, human_othr = (
parse_device_type_timeseries_response_human(result)
)
bot_ts, bot_dsktp, bot_mbl, bot_othr = (
parse_device_type_timeseries_response_bot(result)
)
conf_lvl = result["meta"]["confidenceInfo"]["level"]
aggr_intvl = result["meta"]["aggInterval"]
nrmlztn = result["meta"]["normalization"]
lst_upd = result["meta"]["lastUpdated"]
# Save to the results dataframe ### FIX BELOW HERE ####
human_result_df = make_device_usage_result_df(
"Human",
human_dsktp,
human_mbl,
human_othr,
human_ts,
lst_upd,
nrmlztn,
conf_lvl,
aggr_intvl,
loc,
)
bot_result_df = make_device_usage_result_df(
"Bot",
bot_dsktp,
bot_mbl,
bot_othr,
bot_ts,
lst_upd,
nrmlztn,
conf_lvl,
aggr_intvl,
loc,
)
# Union the results
new_result_df = pd.concat(
[human_result_df, bot_result_df], ignore_index=True, sort=False
)
# Add results to the results dataframe
results_df = pd.concat([results_df, new_result_df])
# If response was not successful, save to the errors dataframe
else:
new_errors_df = pd.DataFrame(
{
"StartTime": [start_date],
"EndTime": [end_date],
"Location": [loc],
}
)
errors_df = pd.concat([errors_df, new_errors_df])
except:
new_errors_df = pd.DataFrame(
{"StartTime": [start_date], "EndTime": [end_date], "Location": [loc]}
)
errors_df = pd.concat([errors_df, new_errors_df])
# LOAD RESULTS & ERRORS TO STAGING GCS
result_fpath = device_usg_configs["bucket"] + device_usg_configs[
"results_stg_gcs_fpth"
] % (start_date, logical_dag_dt)
error_fpath = device_usg_configs["bucket"] + device_usg_configs[
"errors_stg_gcs_fpth"
] % (start_date, logical_dag_dt)
results_df.to_csv(result_fpath, index=False)
errors_df.to_csv(error_fpath, index=False)
print("Wrote errors to: ", error_fpath)
print("Wrote results to: ", result_fpath)
# Print a summary to the console
len_results = str(len(results_df))
len_errors = str(len(errors_df))
results_summary = [len_results, len_errors]
return results_summary