in python/lambda-ddb-mysql-etl-pipeline/lambda/dbwrite.py [0:0]
def get_mysql_attrs(t_suffix):
"""Gets table attributes from Information Schema & creates new staging table, if necessary
Args:
t_suffix (str): The table name suffix of the write target table.
Returns:
attribute_list (list): Array of column names in the specified db table.
"""
if db_conn is None:
return "`mysql_attrs` ref from `get_mysql_attrs()`"
else:
# Gets DB Creds from AWS Secrets Manager
try:
session = boto3.session.Session()
client = session.client(service_name='secretsmanager'
, region_name=os.environ['REGION'])
SECRET = client.get_secret_value(SecretId=os.environ['DB_SECRETS_REF'])
if 'SecretString' in SECRET:
SECRETS = json.loads(SECRET['SecretString'])
else:
SECRETS = json.loads(b64decode(SECRET['SecretBinary']))
except Exception:
logger.error("ERROR: Unable to GET DB Credentials from Secrets Manager")
try:
connection = pymysql.connect(host=SECRETS['MYSQL_ENDPOINT'], port=3306
, user=SECRETS['MYSQL_USER'], password=SECRETS['MYSQL_PASSWD']
, autocommit=True, connect_timeout=5)
except pymysql.MySQLError:
logger.error("MySQLError: MySQL Connection Issue")
cursor_obj = connection.cursor()
table_check = f"""
SELECT
COUNT(*)
FROM information_schema.tables
WHERE TABLE_SCHEMA = '{SCHEMA}'
AND TABLE_NAME = 'table_{t_suffix}'
"""
cursor_obj.execute(table_check)
if cursor_obj.fetchone()[0] == 0:
logger.info('No table exists!!!')
sns = boto3.client('sns')
response = sns.publish(
TopicArn=os.environ['TOPIC_ARN'],
Message=f"""A new t_suffix has been detected. A new staging table has been created with
relevant tuples loaded to : {SCHEMA}.table_{t_suffix}"""
)
create_table = f"""
CREATE TABLE IF NOT EXISTS {SCHEMA}.table_{t_suffix} (
id VARCHAR(7),
`name` VARCHAR(12),
created TIMESTAMP DEFAULT CURRENT_TIMESTAMP
PRIMARY KEY (id),
UNIQUE KEY comp_idx_id_name (id,`name`)
)"""
cursor_obj.execute(create_table)
get_attrs = f"""
SELECT
COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = '{SCHEMA}' AND TABLE_NAME = 'table_{t_suffix}'
"""
cursor_obj.execute(get_attrs)
return [column[0] for column in dwr_cursor.fetchall()]
else:
get_attrs = f"""
SELECT
COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = '{SCHEMA}' AND TABLE_NAME = 'table_{t_suffix}'"""
cursor_obj.execute(get_attrs)
return [column[0] for column in cursor_obj.fetchall()]