in model/run_pipeline.py [0:0]
def get_pipeline_execution_id(pipeline_name, codebuild_id):
codepipeline = boto3.client("codepipeline")
response = codepipeline.get_pipeline_state(name=pipeline_name)
for stage in response["stageStates"]:
for action in stage["actionStates"]:
# Return the matching stage with the same external id
if (
"latestExecution" in action
and "externalExecutionId" in action["latestExecution"]
and action["latestExecution"]["externalExecutionId"] == codebuild_id
):
return stage["latestExecution"]["pipelineExecutionId"]