def dynamic_mysql_crud_ops()

in python/lambda-ddb-mysql-etl-pipeline/lambda/dbwrite.py [0:0]


def dynamic_mysql_crud_ops(ast_list, t_suffix, mysql_attrs):
    """Backs dataset attrs against table attrs then runs CRUD ops dynamically.

    Args:
         ast_list (str): String of s3 body data.
         t_suffix (str): The table name suffix of the write target table.
         mysql_attrs (list): Array of column names from the DB table in question.

    Notes:
        Prints results to CloudWatch Logs will work if one includes DB Credentials
        and removes `None` flag on `db_conn` herein.
    """
    if db_conn is None:
        logger.info(f"VALUES for ast_list, t_suffix, mysql_attrs respectively: \n {ast_list} | {t_suffix} | {mysql_attrs}")
        return {
        "asteroid_list":ast_list,
        "table_suffix":t_suffix,
        "table_attributes": mysql_attrs
        }
    else:
        # Setting header to None to trick Pandas as it ignores them otherwise
        df_attrs = pandas.read_csv(ast_list.read()), sep='|', header=None, index_col=False, dtype=str, keep_default_na=False, nrows=1)
        # Creates final tuple of attributes from dataframe attributes
        final_attrs = [tuple(x) for x in df_attrs.values]
        final_attrs = final_attrs[0]
        # Extract rows from file and create tuples
        df_tuples = pandas.read_csv(ast_list.read()), sep='|', na_values=None, keep_default_na=False, dtype=str)
        df_tuples = df_tuples.apply(tuple, axis=1)
        
        # Compare dwr attrs against s3 attrs to determine ALTER statement requirement and run, if necessary
        for attr in final_attrs:
            if attr in mysql_attrs:
            logger.info('No new attribute to record')
            else:
                logger.info(attr)
                alter_attr_statement = f"""
                    ALTER TABLE {SCHEMA}.table_{t_suffix} ADD `{attr}` VARCHAR(50)"""
                cursor_obj.execute(alter_attr_statement)
        
        # Strip empty quotes in df_tuples, transform to 'None' and build final tuples array for INSERT
        tuples_nullified = []
        for tup in df_tuples:
            tuples_nullified.append((tuple(None if elem == '' else elem for elem in tup)))
        
        # Build dynamic attribute strings for INSERT
        attr_str_insert = ""
        attr_var_insert = ""
        for attr in final_attrs:
            if attr != final_attrs[len(final_attrs) -1]:
        attr_str_insert += "`"+attr+"`, "
        attr_var_insert += "%s,"
        else:
        attr_str_insert += "`"+attr+"`"
        attr_var_insert += "%s"
        
        # Dynamic Insert
        replace_statement = f"""
            REPLACE INTO {SCHEMA}.table_{t_suffix}
            ({attr_str_insert}) VALUES ({attr_var_insert})"""
        cursor_obj.executemany(replace_statement, tuples_nullified)
        logger.info(cursor_obj.rowcount, f"Record(s) inserted successfully into table_{t_suffix}")

        return f"Record(s) inserted successfully into table_{t_suffix}"