def check_pipeline()

in lambda/build/lambda_start_pipeline.py [0:0]


def check_pipeline(job_id, pipeline_name, pipeline_execution_arn=None):
    try:
        if pipeline_execution_arn is None:
            logger.info(
                f"Starting SageMaker Pipeline: {pipeline_name} for job: {job_id}"
            )
            response = sm_client.start_pipeline_execution(
                PipelineName=pipeline_name,
                PipelineExecutionDisplayName=f"codepipeline-{job_id}",
                PipelineParameters=[
                    {"Name": "InputSource", "Value": "CodePipeline"},
                ],
                PipelineExecutionDescription="SageMaker Drift Detection Pipeline",
                ClientRequestToken=job_id,
            )
            logger.debug(response)
            pipeline_execution_arn = response["PipelineExecutionArn"]
            logger.info(f"SageMaker Pipeline arn: {pipeline_execution_arn} started")
        else:
            logger.info(
                f"Checking SageMaker Pipeline: {pipeline_execution_arn} for job: {job_id}"
            )
            response = sm_client.describe_pipeline_execution(
                PipelineExecutionArn=pipeline_execution_arn
            )
            logger.debug(response)
            pipeline_execution_status = response["PipelineExecutionStatus"]
            logger.info(
                f"SageMaker Pipeline arn: {pipeline_execution_arn} {pipeline_execution_status}"
            )
            if pipeline_execution_status in ["Failed", "Stopped"]:
                result = {
                    "type": "JobFailed",
                    "message": f"Pipeline Status is {pipeline_execution_status}",
                    "externalExecutionId": pipeline_execution_arn,
                }
                codepipeline.put_job_failure_result(jobId=job_id, failureDetails=result)
                return 400, result
            elif pipeline_execution_status in ["Executing", "Succeeded"]:
                result = {
                    "Status": pipeline_execution_status,
                    "PipelineExecutionArn": pipeline_execution_arn,
                }
                codepipeline.put_job_success_result(
                    jobId=job_id, outputVariables=result
                )
                return 200, result
        logger.info(f"Continuing code pipeline job: {job_id}")
        codepipeline.put_job_success_result(
            jobId=job_id,
            continuationToken=pipeline_execution_arn,
        )
        return 202, {"PipelineExecutionArn": pipeline_execution_arn}
    except ClientError as e:
        error_code = e.response["Error"]["Code"]
        error_message = e.response["Error"]["Message"]
        result = {
            "type": "JobFailed",
            "message": error_message,
        }
        logger.error(error_message)
        if error_code != "InvalidJobStateException":
            codepipeline.put_job_failure_result(jobId=job_id, failureDetails=result)
        return 500, result
    except Exception as e:
        logger.error(e)
        raise e