in sql/moz-fx-data-shared-prod/cloudflare_derived/os_usage_v1/query.py [0:0]
def get_os_usage_data(date_of_interest, auth_token):
"""Pull OS usage data from the Cloudflare API and save errors & results to GCS."""
# Calculate start date and end date
logical_dag_dt = date_of_interest
logical_dag_dt_as_date = datetime.strptime(logical_dag_dt, "%Y-%m-%d").date()
start_date = logical_dag_dt_as_date - timedelta(days=4)
end_date = start_date + timedelta(days=1)
print("Start Date: ", start_date)
print("End Date: ", end_date)
# Configure request headers
bearer_string = f"Bearer {auth_token}"
headers = {"Authorization": bearer_string}
# Initialize the empty results & errors dataframe
result_df = pd.DataFrame(
{
"Timestamps": [],
"OS": [],
"Location": [],
"DeviceType": [],
"Share": [],
"ConfidenceLevel": [],
"AggrInterval": [],
"Normalization": [],
"LastUpdatedTS": [],
}
)
# Initialize an errors dataframe
errors_df = pd.DataFrame(
{"StartTime": [], "EndTime": [], "Location": [], "DeviceType": []}
)
# Go through all combinations, submit API requests
for device_type in os_usg_configs["device_types"]:
for loc in os_usg_configs["locations"]:
print("Device Type: ", device_type)
print("Loc: ", loc)
# Generate the URL with given parameters
os_usage_api_url = generate_os_timeseries_api_call(
start_date, end_date, "1d", loc, device_type
)
try:
# Call the API and save the response as JSON
response = requests.get(
os_usage_api_url,
headers=headers,
timeout=os_usg_configs["timeout_limit"],
)
response_json = json.loads(response.text)
# If response was successful, get the result
if response_json["success"] is True:
result = response_json["result"]
# Parse metadata
conf_lvl = result["meta"]["confidenceInfo"]["level"]
aggr_intvl = result["meta"]["aggInterval"]
nrmlztn = result["meta"]["normalization"]
lst_upd = result["meta"]["lastUpdated"]
data_dict = result["serie_0"]
for key, val in data_dict.items():
if key != 'timestamps':
new_result_df = pd.DataFrame(
{
"Timestamps": data_dict["timestamps"],
"OS": [key] * len(val),
"Location": [loc] * len(val),
"DeviceType": [device_type] * len(val),
"Share": val,
"ConfidenceLevel": [conf_lvl] * len(val),
"AggrInterval": [aggr_intvl] * len(val),
"Normalization": [nrmlztn] * len(val),
"LastUpdatedTS": [lst_upd] * len(val),
}
)
result_df = pd.concat([result_df, new_result_df])
# If response was not successful, get the errors
else:
# errors = response_json["errors"] # Maybe add to capture, right now not using this
new_errors_df = pd.DataFrame(
{
"StartTime": [start_date],
"EndTime": [end_date],
"Location": [loc],
"DeviceType": [device_type],
}
)
errors_df = pd.concat([errors_df, new_errors_df])
except:
new_errors_df = pd.DataFrame(
{
"StartTime": [start_date],
"EndTime": [end_date],
"Location": [loc],
"DeviceType": [device_type],
}
)
errors_df = pd.concat([errors_df, new_errors_df])
result_fpath = os_usg_configs["bucket"] + os_usg_configs["results_stg_gcs_fpth"] % (
start_date,
logical_dag_dt,
)
errors_fpath = os_usg_configs["bucket"] + os_usg_configs["errors_stg_gcs_fpth"] % (
start_date,
logical_dag_dt,
)
result_df.to_csv(result_fpath, index=False)
errors_df.to_csv(errors_fpath, index=False)
print("Wrote errors to: ", errors_fpath)
print("Wrote results to: ", result_fpath)
# Write a summary to the logs
len_results = str(len(result_df))
len_errors = str(len(errors_df))
result_summary = f"# Result Rows: {len_results}; # of Error Rows: {len_errors}"
return result_summary