def _get_transfer_run_summary()

in src/datamigration/dags/redshift/redshift_transfer_run_log_dag.py [0:0]


def _get_transfer_run_summary(ti) -> None:
    """
    Calls DTS API to get transfer run summary, uses projects.transferConfigs.runs.get
    """
    transfer_config_id = ti.xcom_pull(
        key="transfer_config_id", task_ids="load_parameters"
    )
    run_id = ti.xcom_pull(key="transfer_run_id", task_ids="load_parameters")
    full_run_id = ti.xcom_pull(key="full_run_id", task_ids="load_parameters")
    unique_id = ti.xcom_pull(key="unique_id", task_ids="load_parameters")
    dts_run_summary_json = ti.xcom_pull(
        key="dts_run_summary_json", task_ids="load_parameters"
    )

    try:
        transfer_summary = (
            bq_data_transfer_client.projects()
            .locations()
            .transferConfigs()
            .runs()
            .get(name=full_run_id)
            .execute()
        )
        logging.info(
            f"Method _get_transfer_run_summary: received transfer run summary as {transfer_summary}"
        )
        dts_run_summary_json["unique_id"] = unique_id
        dts_run_summary_json["transfer_config_id"] = transfer_config_id
        dts_run_summary_json["transfer_run_id"] = run_id
        dts_run_summary_json["transfer_run_status"] = transfer_summary["state"]
        dts_run_summary_json["error_message"] = transfer_summary["errorStatus"].get(
            "message", None
        )
        dts_run_summary_json["start_time"] = transfer_summary["startTime"]
        dts_run_summary_json["end_time"] = transfer_summary["endTime"]
        ti.xcom_push(key="dts_run_summary_json", value=dts_run_summary_json)
    except HttpError as err:
        logging.error("Error while requesting transfer run summary")
        logging.exception(err)
        raise err