in functions/source/bootstrap_redshift/bootstrap_redshift.py [0:0]
def insert_records(event):
is_processed = True
try:
db_name = event['ResourceProperties']['DatabaseName']
end_point = event['ResourceProperties']['RedshiftCluster']
db_port = event['ResourceProperties']['DatabasePort']
db_username = event['ResourceProperties']['MasterUsername']
db_password = event['ResourceProperties']['MasterUserPassword']
connection = get_redshift_connection(end_point, db_username, db_password, db_port, db_name)
is_processed = delete_tables_ifExist(connection)
if is_processed:
is_processed = create_tables(connection)
bucket_name = event['ResourceProperties']['BucketName']
path = ''
if 'Path' in event['ResourceProperties']:
path = event['ResourceProperties']['Path']
if not path.endswith('/'):
path = path + '/'
# copy data
if is_processed:
cursor = connection.cursor()
for table in table_names:
table_data = s3_service.get_object(Bucket=bucket_name, Key=path + table + '.csv')
text_object_data = table_data['Body'].read().decode('utf-8','ignore')
csv_data = csv.reader(text_object_data.split("\n"), delimiter=',')
logger.info('started processing table :'+str(table))
if table == 'customers':
save_customers_data(csv_data, cursor, connection)
logger.info('completed processing table :' + str(table))
elif table == 'employees':
save_employees_data(csv_data, cursor, connection)
logger.info('completed processing table :' + str(table))
elif table == 'offices':
save_offices_data(csv_data, cursor, connection)
logger.info('completed processing table :' + str(table))
elif table == 'orderdetails':
save_orderdetails_data(csv_data, cursor, connection)
logger.info('completed processing table :' + str(table))
elif table == 'orders':
save_orders_data(csv_data, cursor, connection)
logger.info('completed processing table :' + str(table))
elif table == 'payments':
save_payments_data(csv_data, cursor, connection)
logger.info('completed processing table :' + str(table))
elif table == 'productlines':
save_productlines_data(csv_data, cursor, connection)
logger.info('completed processing table :' + str(table))
elif table == 'products':
save_products_data(csv_data, cursor, connection)
logger.info('completed processing table :' + str(table))
cursor.close()
connection.close()
except Exception as ex:
is_processed = False
logger.error('Failed to create or update tables/data:'+str(ex))
return is_processed