def get_mysql_attrs()

in python/lambda-ddb-mysql-etl-pipeline/lambda/dbwrite.py [0:0]


def get_mysql_attrs(t_suffix):
    """Gets table attributes from Information Schema & creates new staging table, if necessary

    Args:
         t_suffix (str): The table name suffix of the write target table.
    Returns:
        attribute_list (list): Array of column names in the specified db table.
    """
    if db_conn is None:
        return "`mysql_attrs` ref from `get_mysql_attrs()`"
    else:
        # Gets DB Creds from AWS Secrets Manager
        try:
            session = boto3.session.Session()
            client = session.client(service_name='secretsmanager'
                , region_name=os.environ['REGION'])
            SECRET = client.get_secret_value(SecretId=os.environ['DB_SECRETS_REF'])
            if 'SecretString' in SECRET:
                SECRETS = json.loads(SECRET['SecretString'])
            else:
                SECRETS = json.loads(b64decode(SECRET['SecretBinary']))
        except Exception:
            logger.error("ERROR: Unable to GET DB Credentials from Secrets Manager")

        try:
            connection = pymysql.connect(host=SECRETS['MYSQL_ENDPOINT'], port=3306
                , user=SECRETS['MYSQL_USER'], password=SECRETS['MYSQL_PASSWD']
                , autocommit=True, connect_timeout=5)
        except pymysql.MySQLError:
            logger.error("MySQLError: MySQL Connection Issue")
        cursor_obj = connection.cursor()
        table_check = f"""
            SELECT
                COUNT(*)
            FROM information_schema.tables
            WHERE TABLE_SCHEMA = '{SCHEMA}'
            AND TABLE_NAME = 'table_{t_suffix}'
            """
        cursor_obj.execute(table_check)
        if cursor_obj.fetchone()[0] == 0:
            logger.info('No table exists!!!')
            sns = boto3.client('sns')
            response = sns.publish(
                TopicArn=os.environ['TOPIC_ARN'],
                Message=f"""A new t_suffix has been detected. A new staging table has been created with
                        relevant tuples loaded to : {SCHEMA}.table_{t_suffix}"""
            )
        
            create_table = f"""
            CREATE TABLE IF NOT EXISTS {SCHEMA}.table_{t_suffix} (
            id VARCHAR(7),
            `name` VARCHAR(12),
            created TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            PRIMARY KEY (id),
            UNIQUE KEY comp_idx_id_name (id,`name`)
            )"""
            cursor_obj.execute(create_table)
        
            get_attrs = f"""
            SELECT
            COLUMN_NAME
            FROM INFORMATION_SCHEMA.COLUMNS
            WHERE TABLE_SCHEMA = '{SCHEMA}' AND TABLE_NAME = 'table_{t_suffix}'
            """
            cursor_obj.execute(get_attrs)
        
            return [column[0] for column in dwr_cursor.fetchall()]
        
        else:
            get_attrs = f"""
            SELECT
            COLUMN_NAME
            FROM INFORMATION_SCHEMA.COLUMNS
            WHERE TABLE_SCHEMA = '{SCHEMA}' AND TABLE_NAME = 'table_{t_suffix}'"""
            cursor_obj.execute(get_attrs)
            return [column[0] for column in cursor_obj.fetchall()]