in use-cases/etl-orchestration-with-lambda-and-step-functions/code/redshift_batch_data_api.py [0:0]
def handler(event, context):
# action to be taken by the lambda function.
# Allowed values: [setup_sales_data_pipeline, run_sales_data_pipeline, get_sql_status"]
action = event['Input'].get('action')
try:
if action in ('setup_sales_data_pipeline','run_sales_data_pipeline'):
redshift_config = {}
# Get Database Credentials
# cluster identifier for the Amazon Redshift cluster
redshift_config["redshift_cluster_id"] = os.getenv('redshift_cluster_identifier')
# database name for the Amazon Redshift cluster
redshift_config["redshift_database"] = os.getenv('redshift_database_name')
# database user in the Amazon Redshift cluster with access to execute relevant SQL queries
redshift_config["redshift_user"] = os.getenv('redshift_database_user')
# Get details to run the script
job_run_date = os.getenv('job_run_date')
etl_script_s3_path = event['Input'].get('etl_script_s3_path')
sql_statements = split_sql_statement(etl_script_s3_path, job_run_date)
statement_name = Path(etl_script_s3_path).name
# execute the input SQL statement in the specified Amazon Redshift cluster
sql_batch_id = run_batch_sql(redshift_config, sql_statements, statement_name)
api_response = {'sql_batch_id': sql_batch_id}
elif action == "get_sql_status":
# sql_batch_id to input for action get_sql_status
sql_batch_id = event['Input'].get('sql_batch_id')
# get status of a previously executed data api call
api_status = get_sql_status(sql_batch_id)
api_response = {'status': api_status}
else:
raise ValueError("Invalid Action: " + action)
except NameError as error:
raise NameError(error)
except Exception as exception:
error_message = "Encountered exeption on:" + action + ":" + str(exception)
raise Exception(error_message)
return api_response