in src/translation/dags/translation_utils/ddl_extraction_utils/build_oracle_ddl_extraction_group.py [0:0]
def _extract_ddl(ti, **kwargs):
jsonString = json.dumps(kwargs["dag_run"].conf["config"])
ti.xcom_push(key="next_dag_config", value=jsonString)
config = ast.literal_eval(kwargs["dag_run"].conf["config"])["config"]
try:
user = config["validation_config"]["source_config"]["user"]
password = config["validation_config"]["source_config"]["password"]
host = config["validation_config"]["source_config"]["host"]
port = config["validation_config"]["source_config"]["port"]
serviceName = config["validation_config"]["source_config"]["database"]
if password.startswith(constants.SECRET_PREFIX):
password = Variable.get(password.removeprefix(constants.SECRET_PREFIX))
con = oracledb.connect(
user=user, password=password, host=host, port=port, service_name=serviceName
)
logging.info("Connected")
translationSourcePath = config["migrationTask"]["translationConfigDetails"][
"gcsSourcePath"
]
(
bucket,
sourceFolder,
) = storage_utils.StorageUtils().parse_bucket_and_blob_from_path(
translationSourcePath
)
sourceSchema = config["migrationTask"]["translationConfigDetails"][
"nameMappingList"
]["name_map"][0]["source"]["schema"]
cursor = con.cursor()
except oracledb.DatabaseError as e:
logging.info("Connection to oracle failed")
logging.info(str(e))
raise Exception(str(e))
else:
try:
con.outputtypehandler = output_type_handler
query = """