sql/moz-fx-data-shared-prod/adjust_derived/adjust_deliverables_v1/query.py (220 lines of code) (raw):
"""Adjust data - download deliverables, clean and upload to BigQuery."""
import csv
import json
import tempfile
from argparse import ArgumentParser
import requests
from google.cloud import bigquery
API_URI = "api.adjust.com"
ENDPOINT = "kpis"
API_VERSION = "v1"
"""The APP_TOKEN_LIST is a list of dictionaries.
Keys are the app_name, and app_token in the form:
{"app_name":"<specific app name from Adjust dash>". "app_token":"<unique Adjust app token>}"
Look here to see the apps we are tracking: https://dash.adjust.com/#/.
It is important to maintain this list in order for the script to work especially in the case of new apps
being added to track in Adjust
"""
CSV_FIELDS = [
"date",
"app",
"app_token",
"network",
"network_token",
"campaign",
"campaign_token",
"adgroup",
"adgroup_token",
"creative",
"creative_token",
"country",
"os",
"device",
"clicks",
"installs",
"limit_ad_tracking_install_rate",
"click_conversion_rate",
"impression_conversion_rate",
"sessions",
"daus",
"waus",
"maus",
]
def read_json(filename: str) -> dict:
"""Read JSON file."""
with open(filename, "r") as f:
data = json.loads(f.read())
return data
def write_dict_to_csv(json_data, filename):
"""Write a dictionary to a csv."""
with open(filename, "w") as out_file:
dict_writer = csv.DictWriter(out_file, CSV_FIELDS)
dict_writer.writeheader()
dict_writer.writerows(json_data)
def download_adjust_kpi_data(date, api_token, app_token):
"""Download data from Adjust - API token and APP tokens are called here."""
start_date = date
end_date = date
kpis = [
"clicks",
"installs",
"limit_ad_tracking_install_rate",
"click_conversion_rate",
"impression_conversion_rate",
"sessions",
"daus",
"waus",
"maus",
]
groupings = [
"days",
"apps",
"networks",
"campaigns",
"adgroups",
"creatives",
"countries",
"os_names",
"device_types",
]
# getting overview metrics for different kpis / Deliverables
url = f"https://api.adjust.com/kpis/v1/{app_token}" # overview
url_params = f"start_date={start_date}&end_date={end_date}&kpis={','.join(kpis)}&grouping={','.join(groupings)}"
headers = {
"Authorization": f"Token token={api_token}",
}
response = requests.get(url, headers=headers, params=url_params)
if (response.status_code == 401) or (response.status_code == 400):
print(f"***Error: {response.status_code}***")
print(response.text)
return response
def check_json(adjust_response_text):
"""Script will return an empty dictionary for apps on days when there is no data. Check for that here."""
with tempfile.NamedTemporaryFile() as tmp_json:
with open(tmp_json.name, "w") as f_json:
f_json.write(adjust_response_text)
try:
query_export = read_json(f_json.name)
except (
ValueError
): # ex. json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
return None
return query_export
def clean_json(query_export):
"""JSON sometimes has missing keys, need to clean up the data."""
fields_list = []
for date in query_export["result_set"]["dates"]:
r_date = date["date"]
for app in date["apps"]:
r_app = app.get("name", "no_app_name")
r_app_token = app.get("token", "no_app_token")
for network in app["networks"]:
r_network = network.get("name", "no_network_name")
r_network_token = network.get("token", "no_network_token")
for campaign in network["campaigns"]:
r_campaign = campaign.get("name", "no_campaign_name")
r_campaign_token = campaign.get("token", "no_campaign_token")
for adgroup in campaign["adgroups"]:
r_adgroup = adgroup.get("name", "no_ad_group_name")
r_adgroup_token = adgroup.get("token", "no_adgroup_token")
for creative in adgroup["creatives"]:
r_creative = creative.get("name", "no_creative_name")
r_creative_token = creative.get(
"token", "no_creative_token"
)
for country in creative["countries"]:
r_country = country["country"]
for os_name in country["os_names"]:
r_os_name = os_name["os_name"]
for device in os_name["device_types"]:
r_device = device["device_type"]
field_dict = {
"date": (r_date),
"app": (r_app),
"app_token": (r_app_token),
"network": (r_network),
"network_token": (r_network_token),
"campaign": (r_campaign),
"campaign_token": (r_campaign_token),
"adgroup": (r_adgroup),
"adgroup_token": (r_adgroup_token),
"creative": (r_creative),
"creative_token": (r_creative_token),
"country": (r_country),
"os": (r_os_name),
"device": (r_device),
"clicks": device["kpi_values"][0],
"installs": device["kpi_values"][1],
"limit_ad_tracking_install_rate": device[
"kpi_values"
][2],
"click_conversion_rate": device[
"kpi_values"
][3],
"impression_conversion_rate": device[
"kpi_values"
][4],
"sessions": device["kpi_values"][5],
"daus": device["kpi_values"][6],
"waus": device["kpi_values"][7],
"maus": device["kpi_values"][8],
}
fields_list.append(field_dict)
return fields_list
def upload_to_bigquery(csv_data, project, dataset, date):
"""Upload the data to bigquery."""
print("writing json to csv")
partition = f"{date}".replace("-", "")
with tempfile.NamedTemporaryFile() as tmp_csv:
with open(tmp_csv.name, "w+b") as f_csv:
write_dict_to_csv(csv_data, f_csv.name)
client = bigquery.Client(project)
job_config = bigquery.LoadJobConfig(
create_disposition="CREATE_IF_NEEDED",
write_disposition="WRITE_TRUNCATE",
time_partitioning=bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="date",
),
skip_leading_rows=1,
schema=[
bigquery.SchemaField("date", "DATE"),
bigquery.SchemaField("app", "STRING"),
bigquery.SchemaField("app_token", "STRING"),
bigquery.SchemaField("network", "STRING"),
bigquery.SchemaField("network_token", "STRING"),
bigquery.SchemaField("campaign", "STRING"),
bigquery.SchemaField("campaign_token", "STRING"),
bigquery.SchemaField("adgroup", "STRING"),
bigquery.SchemaField("adgroup_token", "STRING"),
bigquery.SchemaField("creative", "STRING"),
bigquery.SchemaField("creative_token", "STRING"),
bigquery.SchemaField("country", "STRING"),
bigquery.SchemaField("os", "STRING"),
bigquery.SchemaField("device", "STRING"),
bigquery.SchemaField("clicks", "INTEGER"),
bigquery.SchemaField("installs", "INTEGER"),
bigquery.SchemaField("limit_ad_tracking_install_rate", "FLOAT"),
bigquery.SchemaField("click_conversion_rate", "FLOAT"),
bigquery.SchemaField("impression_conversion_rate", "FLOAT"),
bigquery.SchemaField("sessions", "INTEGER"),
bigquery.SchemaField("daus", "INTEGER"),
bigquery.SchemaField("waus", "INTEGER"),
bigquery.SchemaField("maus", "INTEGER"),
],
)
# Table names are based on the app name seen in the Adjust dashboard"
destination = f"{project}.{dataset}.adjust_deliverables_v1${partition}"
job = client.load_table_from_file(f_csv, destination, job_config=job_config)
print(
f"Writing adjust data for all apps to {destination}. BigQuery job ID: {job.job_id}"
)
job.result()
def main():
"""Input data, call functions, get stuff done."""
parser = ArgumentParser(description=__doc__)
parser.add_argument("--date", required=True)
parser.add_argument("--adjust_api_token", required=True)
parser.add_argument("--adjust_app_list", required=True)
parser.add_argument("--project", default="moz-fx-data-shared-prod")
parser.add_argument("--dataset", default="adjust_derived")
args = parser.parse_args()
app_list = json.loads(args.adjust_app_list)
data = []
# Cycle through the apps to get the relevant kpi data
for app in app_list:
print(f'This is data for {app["app_name"]}')
# Ping the Adjust URL and get a response
json_file = download_adjust_kpi_data(
args.date, args.adjust_api_token, app["app_token"]
)
query_export = check_json(json_file.text)
if query_export is not None:
# This section writes the tmp json data into a temp CSV file which will then be put into a BigQuery table
adjust_data = clean_json(query_export)
data.extend(adjust_data)
else:
print(f'no data for {app["app_name"]} today')
upload_to_bigquery(data, args.project, args.dataset, args.date)
if __name__ == "__main__":
main()