in repos/serving/infra/model_endpoint_construct.py [0:0]
def get_processing_output(
pipeline_execution_arn: str,
step_name: str = "BaselineJob",
output_name: str = "monitoring_output",
):
"""Filters the model packages based on a list of model package versions.
Args:
pipeline_execution_arn: The pipeline execution arn
step_name: The optional processing step name
output_name: The output value to pick from the processing job
Returns:
The outputs from the processing job
"""
steps = sm_client.list_pipeline_execution_steps(
PipelineExecutionArn=pipeline_execution_arn
)["PipelineExecutionSteps"]
processing_job_arn = [
s["Metadata"]["ProcessingJob"]["Arn"]
for s in steps
if s["StepName"] == step_name
][0]
processing_job_name = processing_job_arn.split("/")[-1]
outputs = sm_client.describe_processing_job(ProcessingJobName=processing_job_name)[
"ProcessingOutputConfig"
]["Outputs"]
return [o["S3Output"]["S3Uri"] for o in outputs if o["OutputName"] == output_name][
0
]