def handler()

in use-cases/etl-orchestration-with-lambda-and-step-functions/code/redshift_batch_data_api.py [0:0]


def handler(event, context):
    # action to be taken by the lambda function. 
    # Allowed values: [setup_sales_data_pipeline, run_sales_data_pipeline, get_sql_status"]
    action = event['Input'].get('action')
    
    try:
        if action in ('setup_sales_data_pipeline','run_sales_data_pipeline'):
            redshift_config = {}
            
            # Get Database Credentials
            # cluster identifier for the Amazon Redshift cluster
            redshift_config["redshift_cluster_id"] = os.getenv('redshift_cluster_identifier')
            # database name for the Amazon Redshift cluster
            redshift_config["redshift_database"] = os.getenv('redshift_database_name')
            # database user in the Amazon Redshift cluster with access to execute relevant SQL queries
            redshift_config["redshift_user"] = os.getenv('redshift_database_user')
            
            # Get details to run the script
            job_run_date = os.getenv('job_run_date')
            etl_script_s3_path = event['Input'].get('etl_script_s3_path')
            sql_statements = split_sql_statement(etl_script_s3_path, job_run_date)
            statement_name = Path(etl_script_s3_path).name
            
            # execute the input SQL statement in the specified Amazon Redshift cluster
            sql_batch_id = run_batch_sql(redshift_config, sql_statements, statement_name)
            api_response = {'sql_batch_id': sql_batch_id}
        elif action == "get_sql_status":
            # sql_batch_id to input for action get_sql_status
            sql_batch_id = event['Input'].get('sql_batch_id')            
            # get status of a previously executed data api call
            api_status = get_sql_status(sql_batch_id)
            
            api_response = {'status': api_status}
        else:
            raise ValueError("Invalid Action: " + action)
    except NameError as error:
        raise NameError(error)
    except Exception as exception:
        error_message = "Encountered exeption on:" + action + ":" + str(exception)
        raise Exception(error_message)          
    return api_response