def _extract_redshift_ddl()

in src/translation/dags/translation_utils/ddl_extraction_utils/build_redshift_ddl_extraction_group.py [0:0]


def _extract_redshift_ddl(ti, **kwargs):
    config = ast.literal_eval(kwargs["dag_run"].conf["config"])["config"]

    source_config = config["validation_config"]["source_config"]

    hostname = source_config["host"].lower()
    dbname = source_config["database"].lower()
    portno = source_config["port"]
    username = source_config["user"].lower()
    password = source_config["password"]
    if password.startswith(constants.SECRET_PREFIX):
        password = Variable.get(password.removeprefix(constants.SECRET_PREFIX))

    source_schema = config["migrationTask"]["translationConfigDetails"][
        "nameMappingList"
    ]["name_map"][0]["source"]["schema"]

    translation_source_path = config["migrationTask"]["translationConfigDetails"][
        "gcsSourcePath"
    ]
    bucket, folder = gcs_util.parse_bucket_and_blob_from_path(translation_source_path)

    op_type = config["type"].lower()
    if op_type == "sql" or op_type == "dml":
        folder = folder + "/" + METADATA_FOLDER

    conn = redshift_connector.connect(
        host=hostname, database=dbname, port=portno, user=username, password=password
    )

    cursor = conn.cursor()

    cursor.execute(
        f"select tablename from pg_tables where schemaname='{source_schema}' and tableowner='{username}'"
    )

    # Retrieve the query result set
    result: tuple = cursor.fetchall()

    redshift_ddl_extraction_result = []
    for table in result:
        exec_time = datetime.utcnow()

        cursor.execute(f"show table {source_schema}.{table[0]}")

        table_ddl: tuple = cursor.fetchall()
        logging.info(f"DDL extraction for table - {str(table_ddl[0][0])}")

        filename = source_schema + "_" + table[0] + ".sql"

        object_name = folder + "/" + filename
        gcs_util.write_object_in_gcsbucket(
            bucket, object_name, str.encode(table_ddl[0][0])
        )

        redshift_ddl_extraction_result.append(
            {
                "unique_id": config["unique_id"],
                "source_db": "Redshift",
                "database": dbname,
                "schema": source_schema,
                "table_name": table[0],
                "file_path": f"gs://{bucket}/{object_name}",
                "extraction_time": str(exec_time),
            }
        )

    logging.info(f"Lentgth of records - {len(redshift_ddl_extraction_result)}")

    if len(redshift_ddl_extraction_result) > 0:
        insert_result = bigquery.Client().insert_rows_json(
            DDL_EXTRACTION_TABLE, redshift_ddl_extraction_result
        )
        logging.info(f"extraction insertion result: {insert_result}")

        logging.info(
            f"Redshift DDL are stored in source path {translation_source_path}"
        )

    ti.xcom_push(key="config", value=json.dumps(kwargs["dag_run"].conf["config"]))