def _extract_ddl()

in src/translation/dags/translation_utils/ddl_extraction_utils/build_oracle_ddl_extraction_group.py [0:0]


def _extract_ddl(ti, **kwargs):
    jsonString = json.dumps(kwargs["dag_run"].conf["config"])
    ti.xcom_push(key="next_dag_config", value=jsonString)
    config = ast.literal_eval(kwargs["dag_run"].conf["config"])["config"]
    try:
        user = config["validation_config"]["source_config"]["user"]
        password = config["validation_config"]["source_config"]["password"]
        host = config["validation_config"]["source_config"]["host"]
        port = config["validation_config"]["source_config"]["port"]
        serviceName = config["validation_config"]["source_config"]["database"]
        if password.startswith(constants.SECRET_PREFIX):
            password = Variable.get(password.removeprefix(constants.SECRET_PREFIX))
        con = oracledb.connect(
            user=user, password=password, host=host, port=port, service_name=serviceName
        )
        logging.info("Connected")
        translationSourcePath = config["migrationTask"]["translationConfigDetails"][
            "gcsSourcePath"
        ]
        (
            bucket,
            sourceFolder,
        ) = storage_utils.StorageUtils().parse_bucket_and_blob_from_path(
            translationSourcePath
        )
        sourceSchema = config["migrationTask"]["translationConfigDetails"][
            "nameMappingList"
        ]["name_map"][0]["source"]["schema"]
        cursor = con.cursor()

    except oracledb.DatabaseError as e:
        logging.info("Connection to oracle failed")
        logging.info(str(e))
        raise Exception(str(e))
    else:
        try:
            con.outputtypehandler = output_type_handler
            query = """