in src/datamigration/dags/redshift/redshift_transfer_run_log_dag.py [0:0]
def _get_transfer_run_summary(ti) -> None:
"""
Calls DTS API to get transfer run summary, uses projects.transferConfigs.runs.get
"""
transfer_config_id = ti.xcom_pull(
key="transfer_config_id", task_ids="load_parameters"
)
run_id = ti.xcom_pull(key="transfer_run_id", task_ids="load_parameters")
full_run_id = ti.xcom_pull(key="full_run_id", task_ids="load_parameters")
unique_id = ti.xcom_pull(key="unique_id", task_ids="load_parameters")
dts_run_summary_json = ti.xcom_pull(
key="dts_run_summary_json", task_ids="load_parameters"
)
try:
transfer_summary = (
bq_data_transfer_client.projects()
.locations()
.transferConfigs()
.runs()
.get(name=full_run_id)
.execute()
)
logging.info(
f"Method _get_transfer_run_summary: received transfer run summary as {transfer_summary}"
)
dts_run_summary_json["unique_id"] = unique_id
dts_run_summary_json["transfer_config_id"] = transfer_config_id
dts_run_summary_json["transfer_run_id"] = run_id
dts_run_summary_json["transfer_run_status"] = transfer_summary["state"]
dts_run_summary_json["error_message"] = transfer_summary["errorStatus"].get(
"message", None
)
dts_run_summary_json["start_time"] = transfer_summary["startTime"]
dts_run_summary_json["end_time"] = transfer_summary["endTime"]
ti.xcom_push(key="dts_run_summary_json", value=dts_run_summary_json)
except HttpError as err:
logging.error("Error while requesting transfer run summary")
logging.exception(err)
raise err