in src/translation/dags/translation_utils/ddl_extraction_utils/build_teradata_ddl_extraction_group.py [0:0]
def _store_ddl(ti, **kwargs) -> None:
metadata_folder_name = ti.xcom_pull(
key="metadata_folder_name",
task_ids="teradata_extraction_taskgroup.prepare_arguments",
)
logging.info(f"In store ddl : {metadata_folder_name}")
config = ast.literal_eval(kwargs["dag_run"].conf["config"])["config"]
source_schema = config["migrationTask"]["translationConfigDetails"][
"nameMappingList"
]["name_map"][0]["source"]["schema"]
translation_source_path = config["migrationTask"]["translationConfigDetails"][
"gcsSourcePath"
]
bucket, folder = gcs_util.parse_bucket_and_blob_from_path(translation_source_path)
op_type = config["type"].lower()
if op_type == "sql" or op_type == "dml":
folder = folder + "/" + METADATA_FOLDER
csv_file_path = f"{DATA_FOLDER}/{metadata_folder_name}/dbc.TablesV.csv"
# opening the CSV file
ddl_file = open(csv_file_path, mode="r")
# reading the CSV file
csvTableVFile = csv.reader(ddl_file)
headers = next(csvTableVFile)
# displaying the contents of the CSV file
td_ddl_extraction_result = []
for lines in csvTableVFile:
exec_time = datetime.utcnow()
row_dict = csv_utils.row_to_dict(headers, lines)
logging.info(f"DDL for - {row_dict['TableName']}")
filename = row_dict["DataBaseName"] + "_" + row_dict["TableName"] + ".sql"
object_name = folder + "/" + filename
gcs_util.write_object_in_gcsbucket(bucket, object_name, row_dict["RequestText"])
td_ddl_extraction_result.append(
{
"unique_id": config["unique_id"],
"source_db": "Teradata",
"database": source_schema,
"schema": source_schema,
"table_name": row_dict["TableName"],
"file_path": f"gs://{bucket}/{object_name}",
"extraction_time": str(exec_time),
}
)
insert_result = bigquery.Client().insert_rows_json(
DDL_EXTRACTION_TABLE, td_ddl_extraction_result
)
logging.info(f"extraction insertion result: {insert_result}")
logging.info(f"Teradata DDL are stored in source path {translation_source_path}")