in src/translation/dags/translation_utils/ddl_extraction_utils/build_teradata_ddl_extraction_group.py [0:0]
def _prepare_arguments(ti, **kwargs) -> None:
config = ast.literal_eval(kwargs["dag_run"].conf["config"])["config"]
source_config = config["validation_config"]["source_config"]
source_db = source_config["source_type"].lower()
connection_args = "connector:" + source_db
connection_args += ",host:" + source_config["host"]
connection_args += ",user:" + source_config["user-name"]
password = source_config["password"]
if password.startswith(constants.SECRET_PREFIX):
password = Variable.get(password.removeprefix(constants.SECRET_PREFIX))
connection_args += ",password:" + password
source_schema = config["migrationTask"]["translationConfigDetails"][
"nameMappingList"
]["name_map"][0]["source"]["schema"]
connection_args += ",database:" + source_schema
connection_args += ",schema:" + source_schema
connection_args += ",port:" + str(source_config["port"])
connection_args += f",driver:{CP_SCRIPT_FOLDER}/terajdbc4.jar"
metadata_storage_path = config["migrationTask"]["translationConfigDetails"][
"gcsSourcePath"
]
metadata_folder_name = str(uuid.uuid4())
arg_string = (
connection_args
+ " "
+ metadata_storage_path
+ " "
+ source_db
+ " "
+ metadata_folder_name
)
logging.info(
f"Bash script arguments are prepared and metadata_folder_name is {metadata_folder_name}"
)
ti.xcom_push(key="arg_string", value=arg_string)
ti.xcom_push(key="metadata_folder_name", value=metadata_folder_name)
ti.xcom_push(key="config", value=json.dumps(kwargs["dag_run"].conf["config"]))