in model/run_pipeline.py [0:0]
def create_baseline_step(input_data, execution_input, region, role):
# Define the enviornment
dataset_format = DatasetFormat.csv()
env = {
"dataset_format": json.dumps(dataset_format),
"dataset_source": "/opt/ml/processing/input/baseline_dataset_input",
"output_path": "/opt/ml/processing/output",
"publish_cloudwatch_metrics": "Disabled", # Have to be disabled from processing job?
}
# Define the inputs and outputs
inputs = [
ProcessingInput(
source=input_data["BaselineUri"],
destination="/opt/ml/processing/input/baseline_dataset_input",
input_name="baseline_dataset_input",
),
]
outputs = [
ProcessingOutput(
source="/opt/ml/processing/output",
destination=execution_input["BaselineOutputUri"],
output_name="monitoring_output",
),
]
# Get the default model monitor container
monor_monitor_container_uri = retrieve(
region=region, framework="model-monitor", version="latest"
)
# Create the processor
monitor_analyzer = Processor(
image_uri=monor_monitor_container_uri,
role=role,
instance_count=1,
instance_type="ml.m5.xlarge",
max_runtime_in_seconds=1800,
env=env,
)
# Create the processing step
baseline_step = steps.sagemaker.ProcessingStep(
"Baseline Job",
processor=monitor_analyzer,
job_name=execution_input["BaselineJobName"],
inputs=inputs,
outputs=outputs,
experiment_config={
"ExperimentName": execution_input["ExperimentName"], # '$.ExperimentName',
"TrialName": execution_input["TrialName"],
"TrialComponentDisplayName": "Baseline",
},
tags={
"GitBranch": execution_input["GitBranch"],
"GitCommitHash": execution_input["GitCommitHash"],
"DataVersionId": execution_input["DataVersionId"],
},
)
# Add the catch
baseline_step.add_catch(
steps.states.Catch(
error_equals=["States.TaskFailed"],
next_step=stepfunctions.steps.states.Fail(
"Baseline failed", cause="SageMakerBaselineJobFailed"
),
)
)
return baseline_step