def update()

in adjust_report_etl/connector.py [0:0]


def update(configuration: dict, state: dict):
    ADJUST_API_URL = "https://automate.adjust.com/reports-service/csv_report"
    AD_SPEND_MODE = "network"
    DATE_PERIOD = "2023-04-01:-0d"
    API_KEY = configuration["API_KEY"]
    APP_TOKEN = configuration["APP_TOKEN"]

    if not API_KEY:
        log.severe("Api key not provided")
        return
    if not APP_TOKEN:
        log.severe("App token not provided")
        return

    # CSV REPORT
    try:
        headers = {
            "Authorization": f"Bearer {API_KEY}",
        }

        params = {
            "ad_spend_mode": AD_SPEND_MODE,
            "app_token__in": APP_TOKEN,
            "date_period": DATE_PERIOD,
            "dimensions": ",".join(DIMENSIONS),
            "metrics": ",".join(CSV_REPORT_METRICS),
        }
        log.info("Fetching data from Adjust for CSV report...")
        response = requests.get(ADJUST_API_URL, headers=headers, params=params)

        log.info(f"Received response with status code: {response.status_code}")
        response.raise_for_status()
    except requests.exceptions.HTTPError as err:
        log.severe(f"HTTP error occurred: {err}")
        log.severe(f"Response content: {response.text}")
    except Exception as err:
        log.severe(f"Other error occurred: {err}")

    try:
        log.info("Processing Adjust data...")
        csv_reader = csv.DictReader(StringIO(response.text))
        report_data = [row for row in csv_reader]

        log.info("Upserting rows to bigquery...")
        for row in report_data:
            yield op.upsert(
                table=CSV_REPORT_TABLE_NAME,
                data={
                    # Dimensions
                    "adgroup": row["adgroup"],
                    "adgroup_network": row["adgroup_network"],
                    "app": row["app"],
                    "app_token": row["app_token"],
                    "campaign": row["campaign"],
                    "campaign_network": row["campaign_network"],
                    "channel": row["channel"],
                    "country": row["country"],
                    "creative": row["creative"],
                    "creative_network": row["creative_network"],
                    "currency": row["currency"],
                    "day": row["day"],
                    "network": row["network"],
                    "partner_name": row["partner_name"],
                    "source_network": row["source_network"],
                    "store_type": row["store_type"],
                    # Metrics
                    "clicks": int(row["clicks"]),
                    "impressions": int(row["impressions"]),
                    "installs": int(row["installs"]),
                    "cost": float(row["cost"]),
                    "ad_revenue": float(row["ad_revenue"]),
                    "revenue": float(row["revenue"]),
                    "att_status_authorized": int(row["att_status_authorized"]),
                    "att_status_denied": int(row["att_status_denied"]),
                },
            )
        log.info(f"Completed upserting {len(report_data)} rows to BigQuery.")

    except json.JSONDecodeError as json_err:
        log.severe(f"JSON decoding error: {json_err}")
        log.severe(f"Raw response content: {response.text}")
    except Exception as err:
        log.severe(f"Other error occurred: {err}")

    # SKAD REPORT
    try:
        headers = {
            "Authorization": f"Bearer {API_KEY}",
        }

        params = {
            "ad_spend_mode": AD_SPEND_MODE,
            "app_token__in": APP_TOKEN,
            "date_period": DATE_PERIOD,
            "dimensions": ",".join(DIMENSIONS),
            "metrics": ",".join(SKAD_REPORT_METRICS),
        }
        log.info("Fetching data from Adjust for SKAD report...")
        response = requests.get(ADJUST_API_URL, headers=headers, params=params)

        log.info(f"Received response with status code: {response.status_code}")
        response.raise_for_status()
    except requests.exceptions.HTTPError as err:
        log.severe(f"HTTP error occurred: {err}")
        log.severe(f"Response content: {response.text}")
    except Exception as err:
        log.severe(f"Other error occurred: {err}")

    try:
        log.info("Processing Adjust data...")
        csv_reader = csv.DictReader(StringIO(response.text))
        report_data = [row for row in csv_reader]

        log.info("Upserting rows to bigquery...")
        for row in report_data:
            yield op.upsert(
                table=SKAD_REPORT_TABLE_NAME,
                data={
                    # Dimensions
                    "adgroup": row["adgroup"],
                    "adgroup_network": row["adgroup_network"],
                    "app": row["app"],
                    "app_token": row["app_token"],
                    "campaign": row["campaign"],
                    "campaign_network": row["campaign_network"],
                    "channel": row["channel"],
                    "country": row["country"],
                    "creative": row["creative"],
                    "creative_network": row["creative_network"],
                    "currency": row["currency"],
                    "day": row["day"],
                    "network": row["network"],
                    "partner_name": row["partner_name"],
                    "source_network": row["source_network"],
                    "store_type": row["store_type"],
                    # Metrics
                    "skad_installs": int(row["skad_installs"]),
                    "skad_total_installs": int(row["skad_total_installs"]),
                    "valid_conversions": int(row["valid_conversions"]),
                    "conversion_1": int(row["conversion_1"]),
                    "conversion_2": int(row["conversion_2"]),
                    "conversion_3": int(row["conversion_3"]),
                    "conversion_4": int(row["conversion_4"]),
                    "conversion_5": int(row["conversion_5"]),
                    "conversion_6": int(row["conversion_6"]),
                },
            )
        log.info(f"Completed upserting {len(report_data)} rows to BigQuery.")

    except json.JSONDecodeError as json_err:
        log.severe(f"JSON decoding error: {json_err}")
        log.severe(f"Raw response content: {response.text}")
    except Exception as err:
        log.severe(f"Other error occurred: {err}")