def _load_parameters()

in src/datamigration/dags/teradata/teradata_transfer_run_log_dag.py [0:0]


def _load_parameters(ti, **kwargs) -> None:
    """
    Load necessary variables for downstream tasks
    """
    if kwargs["dag_run"].conf is not None:
        full_run_id = ast.literal_eval(kwargs["dag_run"].conf["config"])[
            "TRANSFER_RUN_ID"
        ]
        run_state = ast.literal_eval(kwargs["dag_run"].conf["config"])[
            "TRANSFER_RUN_STATE"
        ]

    else:
        logging.error(
            "Method _load_parameters: full_run_id not provided in DAG conf.\n \
            Please provide full_run_id in form 'projects/{projectId}/transferConfigs/{configId}/runs/{run_id}'\n \
            or 'projects/{projectId}/locations/{locationId}/transferConfigs/{configId}/runs/{run_id}'"
        )
        raise Exception(
            "Method _load_parameters: full_run_id not provided in DAG conf.\n \
            Please provide full_run_id in form 'projects/{projectId}/transferConfigs/{configId}/runs/{run_id}'\n \
            or 'projects/{projectId}/locations/{locationId}/transferConfigs/{configId}/runs/{run_id}'"
        )

    (
        _,
        _,
        transfer_config_id,
        transfer_run_id,
    ) = dts_logs_utils.parse_full_transfer_runID(full_run_id)
    tracking_info = dts_logs_utils.get_tracking_info(
        transfer_config_id, BQ_TRANSFER_TRACKING_TABLE_NAME
    )
    (unique_id, agent_id, config_bucket_id, config_object_path,) = (
        tracking_info["unique_id"],
        tracking_info["agent_id"],
        tracking_info["config_bucket_id"],
        tracking_info["config_object_path"],
    )
    data_transfer_config_json = gcs_util.read_object_from_gcsbucket(
        config_bucket_id, config_object_path
    )

    job_stats_json = bq_result_tbl_utils.get_dts_run_job_stats_template(
        unique_id=unique_id,
        transfer_config_id=transfer_config_id,
        transfer_run_id=transfer_run_id,
        agent_id=agent_id,
    )
    dts_run_summary_json = bq_result_tbl_utils.get_dts_run_summary_template(
        unique_id=unique_id,
        transfer_config_id=transfer_config_id,
        transfer_run_id=transfer_run_id,
        agent_id=agent_id,
    )

    ti.xcom_push(key="unique_id", value=unique_id)
    ti.xcom_push(key="transfer_config_id", value=transfer_config_id)
    ti.xcom_push(key="transfer_run_id", value=transfer_run_id)
    ti.xcom_push(key="full_run_id", value=full_run_id)
    ti.xcom_push(key="run_state", value=run_state)
    ti.xcom_push(key="job_stats_json", value=job_stats_json)
    ti.xcom_push(key="dts_run_summary_json", value=dts_run_summary_json)
    ti.xcom_push(key="data_transfer_config_json", value=data_transfer_config_json)