in pipeline_utils.py [0:0]
def create_lambda_fcn(flow_uri, pipeline_name):
#Set variables
print('Gathering variables ...')
flow_bucket = flow_uri.split('/')[2]
flow_key = '/'.join(flow_uri.split('/')[3:])
pipeline_name = pipeline_name
#Create skeleton lambda code
print('Creating code for AWS Lambda function ...')
lambda_code = """
import json
import boto3
s3 = boto3.resource('s3')
sm = boto3.client('sagemaker')
def lambda_handler(event, context):
#Check version of Boto3 - It must be at least 1.16.55
print(f"The version of Boto3 is {boto3.__version__}")
#Get location for where the new data (csv) file was uploaded
data_bucket = event['Records'][0]['s3']['bucket']['name']
data_key = event['Records'][0]['s3']['object']['key']
print(f"A new file named {data_key} was just uploaded to Amazon S3 in {data_bucket}")
#Update values for where Data Wrangler .flow is saved
flow_bucket = '%(flow_bucket)s'
flow_key = '%(flow_key)s'
pipeline_name = '%(pipeline_name)s'
execution_display = f"{data_key.split('/')[-1].replace('_','').replace('.csv','')}"
#Get .flow file from Amazon S3
get_object = s3.Object(flow_bucket,flow_key)
get_flow = get_object.get()
#Read, update and save the .flow file
flow_content = json.loads(get_flow['Body'].read())
flow_content['nodes'][0]['parameters']['dataset_definition']['name'] = data_key.split('/')[-1]
flow_content['nodes'][0]['parameters']['dataset_definition']['s3ExecutionContext']['s3Uri'] = f"s3://{data_bucket}/{data_key}"
new_flow_key = flow_key.replace('.flow', '-' + data_key.split('/')[-1].replace('.csv','') + '.flow')
new_flow_uri = f"s3://{flow_bucket}/{new_flow_key}"
put_object = s3.Object(flow_bucket,new_flow_key)
put_flow = put_object.put(Body=json.dumps(flow_content))
#Start the pipeline execution
start_pipeline = sm.start_pipeline_execution(
PipelineName=pipeline_name,
PipelineExecutionDisplayName=f"{data_key.split('/')[-1].replace('_','').replace('.csv','')}",
PipelineParameters=[
{
'Name': 'InputFlow',
'Value': new_flow_uri
},
],
PipelineExecutionDescription=data_key
)
print(start_pipeline)
return('SageMaker Pipeline has been successfully executed')
""" % locals()
#Update success status
print('SUCCESS: Successfully created code for AWS Lambda function!')
return lambda_code