in src/datamigration/dags/teradata/teradata_transfer_run_log_dag.py [0:0]
def _load_parameters(ti, **kwargs) -> None:
"""
Load necessary variables for downstream tasks
"""
if kwargs["dag_run"].conf is not None:
full_run_id = ast.literal_eval(kwargs["dag_run"].conf["config"])[
"TRANSFER_RUN_ID"
]
run_state = ast.literal_eval(kwargs["dag_run"].conf["config"])[
"TRANSFER_RUN_STATE"
]
else:
logging.error(
"Method _load_parameters: full_run_id not provided in DAG conf.\n \
Please provide full_run_id in form 'projects/{projectId}/transferConfigs/{configId}/runs/{run_id}'\n \
or 'projects/{projectId}/locations/{locationId}/transferConfigs/{configId}/runs/{run_id}'"
)
raise Exception(
"Method _load_parameters: full_run_id not provided in DAG conf.\n \
Please provide full_run_id in form 'projects/{projectId}/transferConfigs/{configId}/runs/{run_id}'\n \
or 'projects/{projectId}/locations/{locationId}/transferConfigs/{configId}/runs/{run_id}'"
)
(
_,
_,
transfer_config_id,
transfer_run_id,
) = dts_logs_utils.parse_full_transfer_runID(full_run_id)
tracking_info = dts_logs_utils.get_tracking_info(
transfer_config_id, BQ_TRANSFER_TRACKING_TABLE_NAME
)
(unique_id, agent_id, config_bucket_id, config_object_path,) = (
tracking_info["unique_id"],
tracking_info["agent_id"],
tracking_info["config_bucket_id"],
tracking_info["config_object_path"],
)
data_transfer_config_json = gcs_util.read_object_from_gcsbucket(
config_bucket_id, config_object_path
)
job_stats_json = bq_result_tbl_utils.get_dts_run_job_stats_template(
unique_id=unique_id,
transfer_config_id=transfer_config_id,
transfer_run_id=transfer_run_id,
agent_id=agent_id,
)
dts_run_summary_json = bq_result_tbl_utils.get_dts_run_summary_template(
unique_id=unique_id,
transfer_config_id=transfer_config_id,
transfer_run_id=transfer_run_id,
agent_id=agent_id,
)
ti.xcom_push(key="unique_id", value=unique_id)
ti.xcom_push(key="transfer_config_id", value=transfer_config_id)
ti.xcom_push(key="transfer_run_id", value=transfer_run_id)
ti.xcom_push(key="full_run_id", value=full_run_id)
ti.xcom_push(key="run_state", value=run_state)
ti.xcom_push(key="job_stats_json", value=job_stats_json)
ti.xcom_push(key="dts_run_summary_json", value=dts_run_summary_json)
ti.xcom_push(key="data_transfer_config_json", value=data_transfer_config_json)