adjust_report_etl/connector.py (249 lines of code) (raw):

""" cd adjust_report_etl python -m venv myenv source myenv/bin/activate pip install fivetran-connector-sdk fivetran debug --configuration ./configuration/adjust_feast.json fivetran debug --configuration ./configuration/adjust_premium.json fivetran deploy --api-key xxx --destination GNM --connection adjust_feast --configuration ./configuration/adjust_feast.json fivetran deploy --api-key xxx --destination GNM --connection adjust_premium --configuration ./configuration/adjust_premium.json """ import csv import json from io import StringIO import requests from fivetran_connector_sdk import Connector from fivetran_connector_sdk import Logging as log from fivetran_connector_sdk import Operations as op log.LOG_LEVEL = log.Level.INFO CSV_REPORT_TABLE_NAME = "csv_report" CSV_REPORT_METRICS = [ "clicks", "impressions", "installs", "cost", "ad_revenue", "revenue", "att_status_authorized", "att_status_denied", ] SKAD_REPORT_TABLE_NAME = "skad_report" SKAD_REPORT_METRICS = [ "skad_installs", "skad_total_installs", "valid_conversions", "conversion_1", "conversion_2", "conversion_3", "conversion_4", "conversion_5", "conversion_6", ] DIMENSIONS = [ "adgroup", "adgroup_network", "app", "app_token", "campaign", "campaign_network", "channel", "country", "creative", "creative_network", "currency", "day", "network", "partner_name", "source_network", "store_type", ] def schema(configuration: dict): return [ { "table": CSV_REPORT_TABLE_NAME, "primary_key": DIMENSIONS, "columns": { # Dimensions "adgroup": "STRING", "adgroup_network": "STRING", "app": "STRING", "app_token": "STRING", "campaign": "STRING", "campaign_network": "STRING", "channel": "STRING", "country": "STRING", "creative": "STRING", "creative_network": "STRING", "currency": "STRING", "day": "STRING", "network": "STRING", "partner_name": "STRING", "source_network": "STRING", "store_type": "STRING", # Metrics "clicks": "INT", "impressions": "INT", "installs": "INT", "cost": "FLOAT", "ad_revenue": "FLOAT", "revenue": "FLOAT", "att_status_authorized": "INT", "att_status_denied": "INT", }, }, { "table": SKAD_REPORT_TABLE_NAME, "primary_key": DIMENSIONS, "columns": { # Dimensions "adgroup": "STRING", "adgroup_network": "STRING", "app": "STRING", "app_token": "STRING", "campaign": "STRING", "campaign_network": "STRING", "channel": "STRING", "country": "STRING", "creative": "STRING", "creative_network": "STRING", "currency": "STRING", "day": "STRING", "network": "STRING", "partner_name": "STRING", "source_network": "STRING", "store_type": "STRING", # Metrics "skad_installs": "INT", "skad_total_installs": "INT", "valid_conversions": "INT", "conversion_1": "INT", "conversion_2": "INT", "conversion_3": "INT", "conversion_4": "INT", "conversion_5": "INT", "conversion_6": "INT", }, }, ] def update(configuration: dict, state: dict): ADJUST_API_URL = "https://automate.adjust.com/reports-service/csv_report" AD_SPEND_MODE = "network" DATE_PERIOD = "2023-04-01:-0d" API_KEY = configuration["API_KEY"] APP_TOKEN = configuration["APP_TOKEN"] if not API_KEY: log.severe("Api key not provided") return if not APP_TOKEN: log.severe("App token not provided") return # CSV REPORT try: headers = { "Authorization": f"Bearer {API_KEY}", } params = { "ad_spend_mode": AD_SPEND_MODE, "app_token__in": APP_TOKEN, "date_period": DATE_PERIOD, "dimensions": ",".join(DIMENSIONS), "metrics": ",".join(CSV_REPORT_METRICS), } log.info("Fetching data from Adjust for CSV report...") response = requests.get(ADJUST_API_URL, headers=headers, params=params) log.info(f"Received response with status code: {response.status_code}") response.raise_for_status() except requests.exceptions.HTTPError as err: log.severe(f"HTTP error occurred: {err}") log.severe(f"Response content: {response.text}") except Exception as err: log.severe(f"Other error occurred: {err}") try: log.info("Processing Adjust data...") csv_reader = csv.DictReader(StringIO(response.text)) report_data = [row for row in csv_reader] log.info("Upserting rows to bigquery...") for row in report_data: yield op.upsert( table=CSV_REPORT_TABLE_NAME, data={ # Dimensions "adgroup": row["adgroup"], "adgroup_network": row["adgroup_network"], "app": row["app"], "app_token": row["app_token"], "campaign": row["campaign"], "campaign_network": row["campaign_network"], "channel": row["channel"], "country": row["country"], "creative": row["creative"], "creative_network": row["creative_network"], "currency": row["currency"], "day": row["day"], "network": row["network"], "partner_name": row["partner_name"], "source_network": row["source_network"], "store_type": row["store_type"], # Metrics "clicks": int(row["clicks"]), "impressions": int(row["impressions"]), "installs": int(row["installs"]), "cost": float(row["cost"]), "ad_revenue": float(row["ad_revenue"]), "revenue": float(row["revenue"]), "att_status_authorized": int(row["att_status_authorized"]), "att_status_denied": int(row["att_status_denied"]), }, ) log.info(f"Completed upserting {len(report_data)} rows to BigQuery.") except json.JSONDecodeError as json_err: log.severe(f"JSON decoding error: {json_err}") log.severe(f"Raw response content: {response.text}") except Exception as err: log.severe(f"Other error occurred: {err}") # SKAD REPORT try: headers = { "Authorization": f"Bearer {API_KEY}", } params = { "ad_spend_mode": AD_SPEND_MODE, "app_token__in": APP_TOKEN, "date_period": DATE_PERIOD, "dimensions": ",".join(DIMENSIONS), "metrics": ",".join(SKAD_REPORT_METRICS), } log.info("Fetching data from Adjust for SKAD report...") response = requests.get(ADJUST_API_URL, headers=headers, params=params) log.info(f"Received response with status code: {response.status_code}") response.raise_for_status() except requests.exceptions.HTTPError as err: log.severe(f"HTTP error occurred: {err}") log.severe(f"Response content: {response.text}") except Exception as err: log.severe(f"Other error occurred: {err}") try: log.info("Processing Adjust data...") csv_reader = csv.DictReader(StringIO(response.text)) report_data = [row for row in csv_reader] log.info("Upserting rows to bigquery...") for row in report_data: yield op.upsert( table=SKAD_REPORT_TABLE_NAME, data={ # Dimensions "adgroup": row["adgroup"], "adgroup_network": row["adgroup_network"], "app": row["app"], "app_token": row["app_token"], "campaign": row["campaign"], "campaign_network": row["campaign_network"], "channel": row["channel"], "country": row["country"], "creative": row["creative"], "creative_network": row["creative_network"], "currency": row["currency"], "day": row["day"], "network": row["network"], "partner_name": row["partner_name"], "source_network": row["source_network"], "store_type": row["store_type"], # Metrics "skad_installs": int(row["skad_installs"]), "skad_total_installs": int(row["skad_total_installs"]), "valid_conversions": int(row["valid_conversions"]), "conversion_1": int(row["conversion_1"]), "conversion_2": int(row["conversion_2"]), "conversion_3": int(row["conversion_3"]), "conversion_4": int(row["conversion_4"]), "conversion_5": int(row["conversion_5"]), "conversion_6": int(row["conversion_6"]), }, ) log.info(f"Completed upserting {len(report_data)} rows to BigQuery.") except json.JSONDecodeError as json_err: log.severe(f"JSON decoding error: {json_err}") log.severe(f"Raw response content: {response.text}") except Exception as err: log.severe(f"Other error occurred: {err}") connector = Connector(update=update, schema=schema)