def insert_records()

in functions/source/bootstrap_redshift/bootstrap_redshift.py [0:0]


def insert_records(event):
    is_processed = True
    try:
        db_name = event['ResourceProperties']['DatabaseName']
        end_point = event['ResourceProperties']['RedshiftCluster']
        db_port = event['ResourceProperties']['DatabasePort']
        db_username = event['ResourceProperties']['MasterUsername']
        db_password = event['ResourceProperties']['MasterUserPassword']
        connection = get_redshift_connection(end_point, db_username, db_password, db_port, db_name)
        is_processed = delete_tables_ifExist(connection)
        if is_processed:
            is_processed = create_tables(connection)
            bucket_name = event['ResourceProperties']['BucketName']
            path = ''
            if 'Path' in event['ResourceProperties']:
                path = event['ResourceProperties']['Path']
                if not path.endswith('/'):
                    path = path  + '/'
            # copy data
            if is_processed:
                cursor = connection.cursor()
                for table in table_names:
                    table_data = s3_service.get_object(Bucket=bucket_name, Key=path + table + '.csv')
                    text_object_data = table_data['Body'].read().decode('utf-8','ignore')
                    csv_data = csv.reader(text_object_data.split("\n"), delimiter=',')
                    logger.info('started processing table :'+str(table))
                    if table == 'customers':
                        save_customers_data(csv_data, cursor, connection)
                        logger.info('completed processing table :' + str(table))
                    elif table == 'employees':
                        save_employees_data(csv_data, cursor, connection)
                        logger.info('completed processing table :' + str(table))
                    elif table == 'offices':
                        save_offices_data(csv_data, cursor, connection)
                        logger.info('completed processing table :' + str(table))
                    elif table == 'orderdetails':
                        save_orderdetails_data(csv_data, cursor, connection)
                        logger.info('completed processing table :' + str(table))
                    elif table == 'orders':
                        save_orders_data(csv_data, cursor, connection)
                        logger.info('completed processing table :' + str(table))
                    elif table == 'payments':
                        save_payments_data(csv_data, cursor, connection)
                        logger.info('completed processing table :' + str(table))
                    elif table == 'productlines':
                        save_productlines_data(csv_data, cursor, connection)
                        logger.info('completed processing table :' + str(table))
                    elif table == 'products':
                        save_products_data(csv_data, cursor, connection)
                        logger.info('completed processing table :' + str(table))
                cursor.close()
        connection.close()
    except Exception as ex:
        is_processed = False
        logger.error('Failed to create or update tables/data:'+str(ex))
    return is_processed