def _prepare_config_for_reporting()

in src/translation/dags/controller_dag.py [0:0]


def _prepare_config_for_reporting(ti, **kwargs) -> None:
    event_json = kwargs["dag_run"].conf
    event_type = event_json["message"]["attributes"]["eventType"]
    if event_type == "OBJECT_FINALIZE":
        reporting_config = ti.xcom_pull(key="config", task_ids="load_config")
    elif event_type == "TRANSFER_RUN_FINISHED":
        fetch_unique_id_query = "select unique_id from {bq_transfer_tracking_table} where transfer_config_id = '{transfer_config_id}'"
        config = json.loads(base64.b64decode(event_json["message"]["data"]))
        transfer_config_id = config["name"].split("/")[-3]
        data_source_id = config["dataSourceId"]
        if data_source_id == "on_premises":
            source = "teradata"
            bq_transfer_tracking_table = (
                f"{PROJECT_ID}.dmt_logs.dmt_teradata_transfer_tracking"
            )
        elif data_source_id == "redshift":
            source = "redshift"
            bq_transfer_tracking_table = (
                f"{PROJECT_ID}.dmt_logs.dmt_redshift_transfer_tracking"
            )
        # get unique id from transfer tracking table, using transfer config id
        query_job = client.query(
            fetch_unique_id_query.format(
                bq_transfer_tracking_table=bq_transfer_tracking_table,
                transfer_config_id=transfer_config_id,
            )
        )
        unique_id = [row["unique_id"] for row in query_job][0]
        reporting_config = {"unique_id": unique_id, "source": source}
    ti.xcom_push(key="reporting_config", value=reporting_config)