def _prepare_arguments()

in src/translation/dags/translation_utils/ddl_extraction_utils/build_teradata_ddl_extraction_group.py [0:0]


def _prepare_arguments(ti, **kwargs) -> None:
    config = ast.literal_eval(kwargs["dag_run"].conf["config"])["config"]

    source_config = config["validation_config"]["source_config"]

    source_db = source_config["source_type"].lower()
    connection_args = "connector:" + source_db

    connection_args += ",host:" + source_config["host"]
    connection_args += ",user:" + source_config["user-name"]

    password = source_config["password"]
    if password.startswith(constants.SECRET_PREFIX):
        password = Variable.get(password.removeprefix(constants.SECRET_PREFIX))
    connection_args += ",password:" + password

    source_schema = config["migrationTask"]["translationConfigDetails"][
        "nameMappingList"
    ]["name_map"][0]["source"]["schema"]
    connection_args += ",database:" + source_schema
    connection_args += ",schema:" + source_schema
    connection_args += ",port:" + str(source_config["port"])
    connection_args += f",driver:{CP_SCRIPT_FOLDER}/terajdbc4.jar"

    metadata_storage_path = config["migrationTask"]["translationConfigDetails"][
        "gcsSourcePath"
    ]

    metadata_folder_name = str(uuid.uuid4())

    arg_string = (
        connection_args
        + " "
        + metadata_storage_path
        + " "
        + source_db
        + " "
        + metadata_folder_name
    )

    logging.info(
        f"Bash script arguments are prepared and metadata_folder_name is {metadata_folder_name}"
    )

    ti.xcom_push(key="arg_string", value=arg_string)
    ti.xcom_push(key="metadata_folder_name", value=metadata_folder_name)
    ti.xcom_push(key="config", value=json.dumps(kwargs["dag_run"].conf["config"]))