in src/translation/dags/translation_utils/ddl_extraction_utils/build_redshift_ddl_extraction_group.py [0:0]
def _extract_redshift_ddl(ti, **kwargs):
config = ast.literal_eval(kwargs["dag_run"].conf["config"])["config"]
source_config = config["validation_config"]["source_config"]
hostname = source_config["host"].lower()
dbname = source_config["database"].lower()
portno = source_config["port"]
username = source_config["user"].lower()
password = source_config["password"]
if password.startswith(constants.SECRET_PREFIX):
password = Variable.get(password.removeprefix(constants.SECRET_PREFIX))
source_schema = config["migrationTask"]["translationConfigDetails"][
"nameMappingList"
]["name_map"][0]["source"]["schema"]
translation_source_path = config["migrationTask"]["translationConfigDetails"][
"gcsSourcePath"
]
bucket, folder = gcs_util.parse_bucket_and_blob_from_path(translation_source_path)
op_type = config["type"].lower()
if op_type == "sql" or op_type == "dml":
folder = folder + "/" + METADATA_FOLDER
conn = redshift_connector.connect(
host=hostname, database=dbname, port=portno, user=username, password=password
)
cursor = conn.cursor()
cursor.execute(
f"select tablename from pg_tables where schemaname='{source_schema}' and tableowner='{username}'"
)
# Retrieve the query result set
result: tuple = cursor.fetchall()
redshift_ddl_extraction_result = []
for table in result:
exec_time = datetime.utcnow()
cursor.execute(f"show table {source_schema}.{table[0]}")
table_ddl: tuple = cursor.fetchall()
logging.info(f"DDL extraction for table - {str(table_ddl[0][0])}")
filename = source_schema + "_" + table[0] + ".sql"
object_name = folder + "/" + filename
gcs_util.write_object_in_gcsbucket(
bucket, object_name, str.encode(table_ddl[0][0])
)
redshift_ddl_extraction_result.append(
{
"unique_id": config["unique_id"],
"source_db": "Redshift",
"database": dbname,
"schema": source_schema,
"table_name": table[0],
"file_path": f"gs://{bucket}/{object_name}",
"extraction_time": str(exec_time),
}
)
logging.info(f"Lentgth of records - {len(redshift_ddl_extraction_result)}")
if len(redshift_ddl_extraction_result) > 0:
insert_result = bigquery.Client().insert_rows_json(
DDL_EXTRACTION_TABLE, redshift_ddl_extraction_result
)
logging.info(f"extraction insertion result: {insert_result}")
logging.info(
f"Redshift DDL are stored in source path {translation_source_path}"
)
ti.xcom_push(key="config", value=json.dumps(kwargs["dag_run"].conf["config"]))