in python/lambda-ddb-mysql-etl-pipeline/lambda/dbwrite.py [0:0]
def dynamic_mysql_crud_ops(ast_list, t_suffix, mysql_attrs):
"""Backs dataset attrs against table attrs then runs CRUD ops dynamically.
Args:
ast_list (str): String of s3 body data.
t_suffix (str): The table name suffix of the write target table.
mysql_attrs (list): Array of column names from the DB table in question.
Notes:
Prints results to CloudWatch Logs will work if one includes DB Credentials
and removes `None` flag on `db_conn` herein.
"""
if db_conn is None:
logger.info(f"VALUES for ast_list, t_suffix, mysql_attrs respectively: \n {ast_list} | {t_suffix} | {mysql_attrs}")
return {
"asteroid_list":ast_list,
"table_suffix":t_suffix,
"table_attributes": mysql_attrs
}
else:
# Setting header to None to trick Pandas as it ignores them otherwise
df_attrs = pandas.read_csv(ast_list.read()), sep='|', header=None, index_col=False, dtype=str, keep_default_na=False, nrows=1)
# Creates final tuple of attributes from dataframe attributes
final_attrs = [tuple(x) for x in df_attrs.values]
final_attrs = final_attrs[0]
# Extract rows from file and create tuples
df_tuples = pandas.read_csv(ast_list.read()), sep='|', na_values=None, keep_default_na=False, dtype=str)
df_tuples = df_tuples.apply(tuple, axis=1)
# Compare dwr attrs against s3 attrs to determine ALTER statement requirement and run, if necessary
for attr in final_attrs:
if attr in mysql_attrs:
logger.info('No new attribute to record')
else:
logger.info(attr)
alter_attr_statement = f"""
ALTER TABLE {SCHEMA}.table_{t_suffix} ADD `{attr}` VARCHAR(50)"""
cursor_obj.execute(alter_attr_statement)
# Strip empty quotes in df_tuples, transform to 'None' and build final tuples array for INSERT
tuples_nullified = []
for tup in df_tuples:
tuples_nullified.append((tuple(None if elem == '' else elem for elem in tup)))
# Build dynamic attribute strings for INSERT
attr_str_insert = ""
attr_var_insert = ""
for attr in final_attrs:
if attr != final_attrs[len(final_attrs) -1]:
attr_str_insert += "`"+attr+"`, "
attr_var_insert += "%s,"
else:
attr_str_insert += "`"+attr+"`"
attr_var_insert += "%s"
# Dynamic Insert
replace_statement = f"""
REPLACE INTO {SCHEMA}.table_{t_suffix}
({attr_str_insert}) VALUES ({attr_var_insert})"""
cursor_obj.executemany(replace_statement, tuples_nullified)
logger.info(cursor_obj.rowcount, f"Record(s) inserted successfully into table_{t_suffix}")
return f"Record(s) inserted successfully into table_{t_suffix}"