in use-cases/lambda-chaining-with-redshift-data-api/scripts/LambdaRedshiftSetupObjects.py [0:0]
def handler(event, context):
logger.info(json.dumps(event))
lambda_arn = event['ResourceProperties']['lambda_arn']
redshift_cluster_id = event['ResourceProperties']['redshift_cluster_id']
redshift_database = event['ResourceProperties']['redshift_database']
redshift_user = event['ResourceProperties']['redshift_user']
redshift_cluster_iam_role = event['ResourceProperties']['redshift_cluster_iam_role']
sns_topic_arn = event['ResourceProperties']['sns_topic_arn']
if event['RequestType'] == 'Delete':
sql_text = '''
DROP PROCEDURE run_elt_process();
DROP MATERIALIZED VIEW IF EXISTS nyc_yellow_taxi_volume_analysis;
DROP TABLE IF EXISTS nyc_yellow_taxi;
'''
response = invoke_redshift_data_api_lambda(
lambda_arn, redshift_cluster_id, redshift_database, redshift_user, sql_text, sns_topic_arn)
logger.info(response)
cfnresponse.send(event, context, cfnresponse.SUCCESS, {
'Data': 'Delete complete'})
else:
sql_text = '''
CREATE TABLE IF NOT EXISTS nyc_yellow_taxi
(pickup_date DATE
, pickup_datetime TIMESTAMP
, dropoff_datetime TIMESTAMP
, ratecode SMALLINT
, passenger_count SMALLINT
, trip_distance FLOAT4
, fare_amount FLOAT4
, total_amount FLOAT4
, payment_type SMALLINT
, vendorid VARCHAR(20))
SORTKEY(pickup_date);
DROP MATERIALIZED VIEW IF EXISTS nyc_yellow_taxi_volume_analysis;
CREATE MATERIALIZED VIEW nyc_yellow_taxi_volume_analysis
AS
SELECT
DATE_TRUNC('mon',pickup_date) pickup_month
, ROUND(AVG(trip_distance),2) avg_distance
, ROUND(AVG(fare_amount),2) avg_fare
, COUNT(1) total_trips
, SUM(trip_distance) total_distance_per_month
, SUM(fare_amount) total_fare_per_month
FROM nyc_yellow_taxi
GROUP BY 1;
CREATE OR REPLACE PROCEDURE run_elt_process()
AS $$
BEGIN
truncate table nyc_yellow_taxi;
COPY nyc_yellow_taxi FROM 's3://event-driven-app-with-lambda-redshift/nyc_yellow_taxi_raw/'
IAM_ROLE '{}'
region 'us-west-2' delimiter '|';
REFRESH MATERIALIZED VIEW nyc_yellow_taxi_volume_analysis;
END;
$$ LANGUAGE plpgsql;
'''
sql_text = sql_text.format(redshift_cluster_iam_role)
response = invoke_redshift_data_api_lambda(
lambda_arn, redshift_cluster_id, redshift_database, redshift_user, sql_text, sns_topic_arn)
logger.info(response)
cfnresponse.send(event, context, cfnresponse.SUCCESS, {
'Data': 'Create complete'})