def _store_ddl()

in src/translation/dags/translation_utils/ddl_extraction_utils/build_teradata_ddl_extraction_group.py [0:0]


def _store_ddl(ti, **kwargs) -> None:
    metadata_folder_name = ti.xcom_pull(
        key="metadata_folder_name",
        task_ids="teradata_extraction_taskgroup.prepare_arguments",
    )
    logging.info(f"In store ddl : {metadata_folder_name}")

    config = ast.literal_eval(kwargs["dag_run"].conf["config"])["config"]

    source_schema = config["migrationTask"]["translationConfigDetails"][
        "nameMappingList"
    ]["name_map"][0]["source"]["schema"]

    translation_source_path = config["migrationTask"]["translationConfigDetails"][
        "gcsSourcePath"
    ]
    bucket, folder = gcs_util.parse_bucket_and_blob_from_path(translation_source_path)

    op_type = config["type"].lower()
    if op_type == "sql" or op_type == "dml":
        folder = folder + "/" + METADATA_FOLDER

    csv_file_path = f"{DATA_FOLDER}/{metadata_folder_name}/dbc.TablesV.csv"

    # opening the CSV file
    ddl_file = open(csv_file_path, mode="r")

    # reading the CSV file
    csvTableVFile = csv.reader(ddl_file)

    headers = next(csvTableVFile)
    # displaying the contents of the CSV file

    td_ddl_extraction_result = []
    for lines in csvTableVFile:
        exec_time = datetime.utcnow()

        row_dict = csv_utils.row_to_dict(headers, lines)
        logging.info(f"DDL for - {row_dict['TableName']}")

        filename = row_dict["DataBaseName"] + "_" + row_dict["TableName"] + ".sql"

        object_name = folder + "/" + filename
        gcs_util.write_object_in_gcsbucket(bucket, object_name, row_dict["RequestText"])

        td_ddl_extraction_result.append(
            {
                "unique_id": config["unique_id"],
                "source_db": "Teradata",
                "database": source_schema,
                "schema": source_schema,
                "table_name": row_dict["TableName"],
                "file_path": f"gs://{bucket}/{object_name}",
                "extraction_time": str(exec_time),
            }
        )

    insert_result = bigquery.Client().insert_rows_json(
        DDL_EXTRACTION_TABLE, td_ddl_extraction_result
    )
    logging.info(f"extraction insertion result: {insert_result}")

    logging.info(f"Teradata DDL are stored in source path {translation_source_path}")