in src/translation/dags/controller_dag.py [0:0]
def _prepare_config_for_reporting(ti, **kwargs) -> None:
event_json = kwargs["dag_run"].conf
event_type = event_json["message"]["attributes"]["eventType"]
if event_type == "OBJECT_FINALIZE":
reporting_config = ti.xcom_pull(key="config", task_ids="load_config")
elif event_type == "TRANSFER_RUN_FINISHED":
fetch_unique_id_query = "select unique_id from {bq_transfer_tracking_table} where transfer_config_id = '{transfer_config_id}'"
config = json.loads(base64.b64decode(event_json["message"]["data"]))
transfer_config_id = config["name"].split("/")[-3]
data_source_id = config["dataSourceId"]
if data_source_id == "on_premises":
source = "teradata"
bq_transfer_tracking_table = (
f"{PROJECT_ID}.dmt_logs.dmt_teradata_transfer_tracking"
)
elif data_source_id == "redshift":
source = "redshift"
bq_transfer_tracking_table = (
f"{PROJECT_ID}.dmt_logs.dmt_redshift_transfer_tracking"
)
# get unique id from transfer tracking table, using transfer config id
query_job = client.query(
fetch_unique_id_query.format(
bq_transfer_tracking_table=bq_transfer_tracking_table,
transfer_config_id=transfer_config_id,
)
)
unique_id = [row["unique_id"] for row in query_job][0]
reporting_config = {"unique_id": unique_id, "source": source}
ti.xcom_push(key="reporting_config", value=reporting_config)