in triggerStepFunctions/app.py [0:0]
def lambda_handler(event, context):
'''
This lambda function is designed to be used in conjunction with CodePipeline. It can be set as an action in a CodePipline stage.
This function will look for a JSON file (default, scenarios.json) and then
'''
step_functions_response = ""
code_pipeline_response = ""
job_id = event['CodePipeline.job']['id']
s3_location_obj = event['CodePipeline.job']['data']['inputArtifacts'][0]['location']['s3Location']
## Find the file defined in the env variable SCENARIO_DEFINITIONS_FILENAME and invoke the Step Functions state machine (STATE_MACHINE_ARN) with that JSON object.
try:
zip_obj = s3_resource.Object(bucket_name=s3_location_obj['bucketName'], key=s3_location_obj['objectKey'])
buffer = io.BytesIO(zip_obj.get()["Body"].read())
zipOut = ZipFile(buffer)
for filename in zipOut.namelist():
if (filename==SCENARIO_DEFINITIONS_FILENAME):
filecontent = zipOut.open(filename)
launch_json = json.loads(filecontent.read())
launch_json['codePipelineJobId'] = job_id
step_functions_response = step_functions.start_execution(
stateMachineArn=STATE_MACHINE_ARN,
input=json.dumps(launch_json)
)
except Exception as e:
code_pipeline_response = pipeline.put_job_failure_result(jobId=job_id, failureDetails={
'type': 'JobFailed',
'message': str(e)
})
return {
"statusCode": 200,
"body": json.dumps({
"step_functions_response": step_functions_response,
"code_pipeline_response": code_pipeline_response
}, indent=4, sort_keys=True, default=str),
}