in adjust_report_etl/connector.py [0:0]
def update(configuration: dict, state: dict):
ADJUST_API_URL = "https://automate.adjust.com/reports-service/csv_report"
AD_SPEND_MODE = "network"
DATE_PERIOD = "2023-04-01:-0d"
API_KEY = configuration["API_KEY"]
APP_TOKEN = configuration["APP_TOKEN"]
if not API_KEY:
log.severe("Api key not provided")
return
if not APP_TOKEN:
log.severe("App token not provided")
return
# CSV REPORT
try:
headers = {
"Authorization": f"Bearer {API_KEY}",
}
params = {
"ad_spend_mode": AD_SPEND_MODE,
"app_token__in": APP_TOKEN,
"date_period": DATE_PERIOD,
"dimensions": ",".join(DIMENSIONS),
"metrics": ",".join(CSV_REPORT_METRICS),
}
log.info("Fetching data from Adjust for CSV report...")
response = requests.get(ADJUST_API_URL, headers=headers, params=params)
log.info(f"Received response with status code: {response.status_code}")
response.raise_for_status()
except requests.exceptions.HTTPError as err:
log.severe(f"HTTP error occurred: {err}")
log.severe(f"Response content: {response.text}")
except Exception as err:
log.severe(f"Other error occurred: {err}")
try:
log.info("Processing Adjust data...")
csv_reader = csv.DictReader(StringIO(response.text))
report_data = [row for row in csv_reader]
log.info("Upserting rows to bigquery...")
for row in report_data:
yield op.upsert(
table=CSV_REPORT_TABLE_NAME,
data={
# Dimensions
"adgroup": row["adgroup"],
"adgroup_network": row["adgroup_network"],
"app": row["app"],
"app_token": row["app_token"],
"campaign": row["campaign"],
"campaign_network": row["campaign_network"],
"channel": row["channel"],
"country": row["country"],
"creative": row["creative"],
"creative_network": row["creative_network"],
"currency": row["currency"],
"day": row["day"],
"network": row["network"],
"partner_name": row["partner_name"],
"source_network": row["source_network"],
"store_type": row["store_type"],
# Metrics
"clicks": int(row["clicks"]),
"impressions": int(row["impressions"]),
"installs": int(row["installs"]),
"cost": float(row["cost"]),
"ad_revenue": float(row["ad_revenue"]),
"revenue": float(row["revenue"]),
"att_status_authorized": int(row["att_status_authorized"]),
"att_status_denied": int(row["att_status_denied"]),
},
)
log.info(f"Completed upserting {len(report_data)} rows to BigQuery.")
except json.JSONDecodeError as json_err:
log.severe(f"JSON decoding error: {json_err}")
log.severe(f"Raw response content: {response.text}")
except Exception as err:
log.severe(f"Other error occurred: {err}")
# SKAD REPORT
try:
headers = {
"Authorization": f"Bearer {API_KEY}",
}
params = {
"ad_spend_mode": AD_SPEND_MODE,
"app_token__in": APP_TOKEN,
"date_period": DATE_PERIOD,
"dimensions": ",".join(DIMENSIONS),
"metrics": ",".join(SKAD_REPORT_METRICS),
}
log.info("Fetching data from Adjust for SKAD report...")
response = requests.get(ADJUST_API_URL, headers=headers, params=params)
log.info(f"Received response with status code: {response.status_code}")
response.raise_for_status()
except requests.exceptions.HTTPError as err:
log.severe(f"HTTP error occurred: {err}")
log.severe(f"Response content: {response.text}")
except Exception as err:
log.severe(f"Other error occurred: {err}")
try:
log.info("Processing Adjust data...")
csv_reader = csv.DictReader(StringIO(response.text))
report_data = [row for row in csv_reader]
log.info("Upserting rows to bigquery...")
for row in report_data:
yield op.upsert(
table=SKAD_REPORT_TABLE_NAME,
data={
# Dimensions
"adgroup": row["adgroup"],
"adgroup_network": row["adgroup_network"],
"app": row["app"],
"app_token": row["app_token"],
"campaign": row["campaign"],
"campaign_network": row["campaign_network"],
"channel": row["channel"],
"country": row["country"],
"creative": row["creative"],
"creative_network": row["creative_network"],
"currency": row["currency"],
"day": row["day"],
"network": row["network"],
"partner_name": row["partner_name"],
"source_network": row["source_network"],
"store_type": row["store_type"],
# Metrics
"skad_installs": int(row["skad_installs"]),
"skad_total_installs": int(row["skad_total_installs"]),
"valid_conversions": int(row["valid_conversions"]),
"conversion_1": int(row["conversion_1"]),
"conversion_2": int(row["conversion_2"]),
"conversion_3": int(row["conversion_3"]),
"conversion_4": int(row["conversion_4"]),
"conversion_5": int(row["conversion_5"]),
"conversion_6": int(row["conversion_6"]),
},
)
log.info(f"Completed upserting {len(report_data)} rows to BigQuery.")
except json.JSONDecodeError as json_err:
log.severe(f"JSON decoding error: {json_err}")
log.severe(f"Raw response content: {response.text}")
except Exception as err:
log.severe(f"Other error occurred: {err}")