sql/moz-fx-data-shared-prod/microsoft_derived/app_conversions_v1/query.py (170 lines of code) (raw):

"""microsoft_store data - download deliverables, clean and upload to BigQuery.""" import csv import json import os import re import tempfile from argparse import ArgumentParser from time import sleep import requests from google.cloud import bigquery API_URI = "https://manage.devcenter.microsoft.com" """MICROSOFT_STORE_APP_LIST is a list of dictionaries. Keys are the app_name, and app_tenant_id in the form: {"app_name":"<specific_app_name from Microsoft Store", "tenant_id":"<unique Microsoft tenant_id"} Look here to see the apps we are tracking: https://partner.microsoft.com/en-us/dashboard/insights/analytics/reports/summary """ CSV_FIELDS = [ "date", "application_id", "application_name", "custom_campaign_id", "referrer_uri_domain", "channel_type", "store_client", "device_type", "market", "click_count", "conversion_count", ] MS_CLIENT_ID = os.environ.get("MICROSOFT_CLIENT_ID") MS_CLIENT_SECRET = os.environ.get("MICROSOFT_CLIENT_SECRET") MS_APP_LIST = os.environ.get("MICROSOFT_STORE_APP_LIST") MS_TENANT_ID = os.environ.get("MICROSOFT_TENANT_ID") def post_response(url, headers, data): """POST response function.""" response = requests.post(url, headers=headers, data=data) if (response.status_code == 401) or (response.status_code == 400): print(f"***Error: {response.status_code}***") print(response.text) return response def get_response(url, headers, params): """GET response function.""" response = requests.get(url, headers=headers, params=params) if (response.status_code == 401) or (response.status_code == 400): print(f"***Error: {response.status_code}***") print(response.text) return response def read_json(filename: str) -> dict: """Read JSON file.""" with open(filename, "r") as f: print(f.read()) data = json.loads(f.read()) return data def change_null_to_string(json_string): """Change null values in downloaded json string to string.""" none_string = re.sub("null", '"null"', json_string) return none_string def write_dict_to_csv(json_data, filename): """Write a dictionary to a csv.""" with open(filename, "w") as out_file: dict_writer = csv.DictWriter(out_file, CSV_FIELDS) dict_writer.writeheader() dict_writer.writerows(json_data) def microsoft_authorization(tenant_id, client_id, client_secret, resource_url): """Microsoft Store Authoriazation. Returns the bearer token required for data download.""" url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/token" query_params = { "grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, "resource": resource_url, } headers = {"Content-Type": "application/x-www-form-urlencoded"} json_file = post_response(url, headers, query_params) response_data = json.loads(json_file.text) bearer_token = response_data["access_token"] return bearer_token def download_microsoft_store_data(date, application_id, bearer_token): """Download data from Microsoft - application_id, bearer_token are called here.""" # Need to delay the running of the job to ensure data is present. start_date = date end_date = date token = bearer_token app_id = application_id # getting overview metrics for different kpis / Deliverables url = f"https://manage.devcenter.microsoft.com/v1.0/my/analytics/appchannelconversions?applicationId={app_id}" url_params = ( f"aggregationLevel=day&startDate={start_date}&endDate={end_date}&skip=0" ) headers = { "Content-Type": "application/x-www-form-urlencoded", "Authorization": f"Bearer {token}", } print(url) response = get_response(url, headers, url_params) return response def clean_json(query_export, date): """Turn the json file into a list to be input into a CSV for bq upload.""" fields_list = [] for val in query_export["Value"]: field_dict = { "date": date, "application_id": val["applicationId"], "application_name": val["applicationName"], "custom_campaign_id": val["customCampaignId"], "referrer_uri_domain": val["referrerUriDomain"], "channel_type": val["channelType"], "store_client": val["storeClient"], "device_type": val["deviceType"], "market": val["market"], "click_count": val["clickCount"], "conversion_count": val["conversionCount"], } fields_list.append(field_dict) return fields_list def upload_to_bigquery(csv_data, project, dataset, table_name, date): """Upload the data to bigquery.""" date = date print("writing json to csv") partition = f"{date}".replace("-", "") print(partition) with tempfile.NamedTemporaryFile() as tmp_csv: with open(tmp_csv.name, "w+b") as f_csv: write_dict_to_csv(csv_data, f_csv.name) client = bigquery.Client(project) job_config = bigquery.LoadJobConfig( create_disposition="CREATE_IF_NEEDED", write_disposition="WRITE_TRUNCATE", time_partitioning=bigquery.TimePartitioning( type_=bigquery.TimePartitioningType.DAY, field="date", ), skip_leading_rows=1, schema=[ bigquery.SchemaField("date", "DATE"), bigquery.SchemaField("application_id", "STRING"), bigquery.SchemaField("application_name", "STRING"), bigquery.SchemaField("custom_campaign_id", "STRING"), bigquery.SchemaField("referrer_uri_domain", "STRING"), bigquery.SchemaField("channel_type", "STRING"), bigquery.SchemaField("store_client", "STRING"), bigquery.SchemaField("device_type", "STRING"), bigquery.SchemaField("market", "STRING"), bigquery.SchemaField("click_count", "INT64"), bigquery.SchemaField("conversion_count", "INT64"), ], ) destination = f"{project}.{dataset}.{table_name}${partition}" job = client.load_table_from_file(f_csv, destination, job_config=job_config) print( f"Writing microsoft_store data for all apps to {destination}. BigQuery job ID: {job.job_id}" ) job.result() def main(): """Input data, call functions, get stuff done.""" parser = ArgumentParser(description=__doc__) parser.add_argument("--date", required=True) parser.add_argument("--project", default="moz-fx-data-shared-prod") parser.add_argument("--dataset", default="microsoft_derived") args = parser.parse_args() project = args.project dataset = args.dataset table_name = "app_conversions_v1" date = args.date client_id = MS_CLIENT_ID client_secret = MS_CLIENT_SECRET app_list = MS_APP_LIST tenant_id = MS_TENANT_ID resource_url = API_URI ms_app_list = json.loads(app_list) data = [] bearer_token = microsoft_authorization( tenant_id, client_id, client_secret, resource_url ) # Cycle through the apps to get the relevant data for app in ms_app_list: print(f'This is data for {app["app_name"]} - {app["app_id"]} for ', date) # Ping the microsoft_store URL and get a response json_file = download_microsoft_store_data(date, app["app_id"], bearer_token) # For some reason, the date returned from https://manage.devcenter.microsoft.com/v1.0/my/analytics/appchannelconversions? returns the date as null. # This needs to be changed from a null to a string null. json_none_string = change_null_to_string(json_file.text) # Convert the string to a dictionary query_export = eval(json_none_string) if query_export["Value"]: # This section writes the tmp json data into a temp CSV file which will then be put into a BigQuery table microsoft_store_data = clean_json(query_export, date) data.extend(microsoft_store_data) else: print("no data for ", date) sleep(5) upload_to_bigquery(data, project, dataset, table_name, date) if __name__ == "__main__": main()