in lambda/build/lambda_start_pipeline.py [0:0]
def check_pipeline(job_id, pipeline_name, pipeline_execution_arn=None):
try:
if pipeline_execution_arn is None:
logger.info(
f"Starting SageMaker Pipeline: {pipeline_name} for job: {job_id}"
)
response = sm_client.start_pipeline_execution(
PipelineName=pipeline_name,
PipelineExecutionDisplayName=f"codepipeline-{job_id}",
PipelineParameters=[
{"Name": "InputSource", "Value": "CodePipeline"},
],
PipelineExecutionDescription="SageMaker Drift Detection Pipeline",
ClientRequestToken=job_id,
)
logger.debug(response)
pipeline_execution_arn = response["PipelineExecutionArn"]
logger.info(f"SageMaker Pipeline arn: {pipeline_execution_arn} started")
else:
logger.info(
f"Checking SageMaker Pipeline: {pipeline_execution_arn} for job: {job_id}"
)
response = sm_client.describe_pipeline_execution(
PipelineExecutionArn=pipeline_execution_arn
)
logger.debug(response)
pipeline_execution_status = response["PipelineExecutionStatus"]
logger.info(
f"SageMaker Pipeline arn: {pipeline_execution_arn} {pipeline_execution_status}"
)
if pipeline_execution_status in ["Failed", "Stopped"]:
result = {
"type": "JobFailed",
"message": f"Pipeline Status is {pipeline_execution_status}",
"externalExecutionId": pipeline_execution_arn,
}
codepipeline.put_job_failure_result(jobId=job_id, failureDetails=result)
return 400, result
elif pipeline_execution_status in ["Executing", "Succeeded"]:
result = {
"Status": pipeline_execution_status,
"PipelineExecutionArn": pipeline_execution_arn,
}
codepipeline.put_job_success_result(
jobId=job_id, outputVariables=result
)
return 200, result
logger.info(f"Continuing code pipeline job: {job_id}")
codepipeline.put_job_success_result(
jobId=job_id,
continuationToken=pipeline_execution_arn,
)
return 202, {"PipelineExecutionArn": pipeline_execution_arn}
except ClientError as e:
error_code = e.response["Error"]["Code"]
error_message = e.response["Error"]["Message"]
result = {
"type": "JobFailed",
"message": error_message,
}
logger.error(error_message)
if error_code != "InvalidJobStateException":
codepipeline.put_job_failure_result(jobId=job_id, failureDetails=result)
return 500, result
except Exception as e:
logger.error(e)
raise e