def get_os_usage_data()

in sql/moz-fx-data-shared-prod/cloudflare_derived/os_usage_v1/query.py [0:0]


def get_os_usage_data(date_of_interest, auth_token):
    """Pull OS usage data from the Cloudflare API and save errors & results to GCS."""
    # Calculate start date and end date
    logical_dag_dt = date_of_interest
    logical_dag_dt_as_date = datetime.strptime(logical_dag_dt, "%Y-%m-%d").date()
    start_date = logical_dag_dt_as_date - timedelta(days=4)
    end_date = start_date + timedelta(days=1)
    print("Start Date: ", start_date)
    print("End Date: ", end_date)

    # Configure request headers
    bearer_string = f"Bearer {auth_token}"
    headers = {"Authorization": bearer_string}

    # Initialize the empty results & errors dataframe
    result_df = pd.DataFrame(
        {
            "Timestamps": [],
            "OS": [],
            "Location": [],
            "DeviceType": [],
            "Share": [],
            "ConfidenceLevel": [],
            "AggrInterval": [],
            "Normalization": [],
            "LastUpdatedTS": [],
        }
    )

    # Initialize an errors dataframe
    errors_df = pd.DataFrame(
        {"StartTime": [], "EndTime": [], "Location": [], "DeviceType": []}
    )

    # Go through all combinations, submit API requests
    for device_type in os_usg_configs["device_types"]:
        for loc in os_usg_configs["locations"]:
            print("Device Type: ", device_type)
            print("Loc: ", loc)

            # Generate the URL with given parameters
            os_usage_api_url = generate_os_timeseries_api_call(
                start_date, end_date, "1d", loc, device_type
            )
            try:
                # Call the API and save the response as JSON
                response = requests.get(
                    os_usage_api_url,
                    headers=headers,
                    timeout=os_usg_configs["timeout_limit"],
                )
                response_json = json.loads(response.text)

                # If response was successful, get the result
                if response_json["success"] is True:
                    result = response_json["result"]
                    # Parse metadata
                    conf_lvl = result["meta"]["confidenceInfo"]["level"]
                    aggr_intvl = result["meta"]["aggInterval"]
                    nrmlztn = result["meta"]["normalization"]
                    lst_upd = result["meta"]["lastUpdated"]
                    data_dict = result["serie_0"]

                    for key, val in data_dict.items():
                        if key != 'timestamps':
                            new_result_df = pd.DataFrame(
                                {
                                    "Timestamps": data_dict["timestamps"],
                                    "OS": [key] * len(val),
                                    "Location": [loc] * len(val),
                                    "DeviceType": [device_type] * len(val),
                                    "Share": val,
                                    "ConfidenceLevel": [conf_lvl] * len(val),
                                    "AggrInterval": [aggr_intvl] * len(val),
                                    "Normalization": [nrmlztn] * len(val),
                                    "LastUpdatedTS": [lst_upd] * len(val),
                                }
                            )
                            result_df = pd.concat([result_df, new_result_df])

                # If response was not successful, get the errors
                else:
                    # errors = response_json["errors"]  # Maybe add to capture, right now not using this
                    new_errors_df = pd.DataFrame(
                        {
                            "StartTime": [start_date],
                            "EndTime": [end_date],
                            "Location": [loc],
                            "DeviceType": [device_type],
                        }
                    )
                    errors_df = pd.concat([errors_df, new_errors_df])
            except:
                new_errors_df = pd.DataFrame(
                    {
                        "StartTime": [start_date],
                        "EndTime": [end_date],
                        "Location": [loc],
                        "DeviceType": [device_type],
                    }
                )
                errors_df = pd.concat([errors_df, new_errors_df])

    result_fpath = os_usg_configs["bucket"] + os_usg_configs["results_stg_gcs_fpth"] % (
        start_date,
        logical_dag_dt,
    )
    errors_fpath = os_usg_configs["bucket"] + os_usg_configs["errors_stg_gcs_fpth"] % (
        start_date,
        logical_dag_dt,
    )

    result_df.to_csv(result_fpath, index=False)
    errors_df.to_csv(errors_fpath, index=False)
    print("Wrote errors to: ", errors_fpath)
    print("Wrote results to: ", result_fpath)

    # Write a summary to the logs
    len_results = str(len(result_df))
    len_errors = str(len(errors_df))
    result_summary = f"# Result Rows: {len_results}; # of Error Rows: {len_errors}"
    return result_summary