in use-cases/lambda-chaining-with-redshift-data-api/scripts/LambdaRedshiftDataApiETL.py [0:0]
def handler(event, context):
print(event)
# input parameters passed from the caller event
# cluster identifier for the Amazon Redshift cluster
redshift_cluster_id = event['Input'].get('redshift_cluster_id')
# database name for the Amazon Redshift cluster
redshift_database = event['Input'].get('redshift_database')
# database user in the Amazon Redshift cluster with access to run relevant SQL queries
redshift_user = event['Input'].get('redshift_user')
# Amazon SNS topic to be used to publish notifications to end-users
sns_topic_arn = event['Input'].get('sns_topic_arn')
# action to be taken by the lambda function. Allowed values: [run_sql, status_check, notify]
action = event['Input'].get('action')
# sql text to be run. e.g. call my_stored_procedure(input_params)
sql_text = event['Input'].get('sql_text')
# subject line to be used while publishing message through Amazon SNS
subject = event['Input'].get('subject')
# detailed body to be used while publishing message through Amazon SNS
body = event['Input'].get('body')
# query_id to input for action status_check
query_id = event['Input'].get('query_id')
try:
if action == 'run_sql':
# s3_bucket to unload redshift data
s3_bucket = event['Input'].get('s3bucketname')
redshift_iam_role = event['Input'].get('redshift_iam_role')
# run the input SQL statement in the specified Amazon Redshift cluster
# sql text to be run. e.g. call my_stored_procedure(input_params)
sql_text = "unload ('select * from nyc_yellow_taxi_volume_analysis where pickup_month=''2015-12-01 00:00:00'' ')" + \
" to '" + s3_bucket + "'" + " iam_role '" + redshift_iam_role + "' FORMAT PARQUET " + \
" ALLOWOVERWRITE PARTITION BY (PICKUP_MONTH); "
res = run_sql(client, sql_text, redshift_database,
redshift_user, redshift_cluster_id)
elif action == "status_check":
# check status of a previously run query
res = status_check(client, query_id)
elif action == "notify":
# notify to end-users by publishing message in Amazon SNS service
res = notify(sns_topic_arn, subject, body)
else:
raise ValueError("Invalid Action: " + action)
except Exception as e:
subject = "Error:" + action + ":" + str(e)
body = traceback.format_exc()
notify(sns_topic_arn, subject, body)
raise
return {'statusCode': 200, 'body': json.dumps(res)}