in custom_resource/sagemaker_suggest_baseline.py [0:0]
def get_processing_request(event, dataset_format=DatasetFormat.csv()):
props = event["ResourceProperties"]
request = {
"ProcessingInputs": [
{
"InputName": "baseline_dataset_input",
"S3Input": {
"S3Uri": props["BaselineInputUri"],
"LocalPath": "/opt/ml/processing/input/baseline_dataset_input",
"S3DataType": "S3Prefix",
"S3InputMode": "File",
"S3DataDistributionType": "FullyReplicated",
"S3CompressionType": "None",
},
}
],
"ProcessingOutputConfig": {
"Outputs": [
{
"OutputName": "monitoring_output",
"S3Output": {
"S3Uri": props["BaselineResultsUri"],
"LocalPath": "/opt/ml/processing/output",
"S3UploadMode": props.get("S3UploadMode", "EndOfJob"),
},
}
]
},
"ProcessingJobName": props["ProcessingJobName"],
"ProcessingResources": {
"ClusterConfig": {
"InstanceCount": 1,
"InstanceType": props.get("InstanceType", "ml.m5.xlarge"),
"VolumeSizeInGB": 30,
}
},
"StoppingCondition": {
"MaxRuntimeInSeconds": int(props.get("MaxRuntimeInSeconds", 1800)) # 30 minutes
},
"AppSpecification": {
"ImageUri": props.get("ImageURI", get_model_monitor_container_uri(helper._region)),
},
"Environment": {
"dataset_format": json.dumps(dataset_format),
"dataset_source": "/opt/ml/processing/input/baseline_dataset_input",
"output_path": "/opt/ml/processing/output",
"publish_cloudwatch_metrics": props.get("PublishCloudwatchMetrics", "Disabled"),
},
"RoleArn": props["PassRoleArn"],
}
# Add the KmsKeyId to monitoring outputs and cluster volume if provided
if props.get("KmsKeyId") is not None:
request["ProcessingOutputConfig"]["KmsKeyId"] = props["KmsKeyId"]
request["ProcessingResources"]["ClusterConfig"]["VolumeKmsKeyId"] = props["KmsKeyId"]
# Add experiment tracking
request["ExperimentConfig"] = {
"ExperimentName": props["ExperimentName"],
"TrialName": props["TrialName"],
"TrialComponentDisplayName": "Baseline",
}
# Add optional pre/processing scripts
if props.get("RecordPreprocessorSourceUri"):
env = request["Environment"]
fn = get_file_name(props["RecordPreprocessorSourceUri"])
env["record_preprocessor_script"] = "/opt/ml/processing/code/postprocessing/" + fn
request["ProcessingInputs"].append(
{
"InputName": "pre_processor_script",
"S3Input": {
"S3Uri": props["RecordPreprocessorSourceUri"],
"LocalPath": "/opt/ml/processing/code/postprocessing",
"S3DataType": "S3Prefix",
"S3InputMode": "File",
"S3DataDistributionType": "FullyReplicated",
"S3CompressionType": "None",
},
}
)
if props.get("PostAnalyticsProcessorSourceUri"):
env = request["Environment"]
fn = get_file_name(props["PostAnalyticsProcessorSourceUri"])
env["post_analytics_processor_script"] = "/opt/ml/processing/code/postprocessing/" + fn
request["ProcessingInputs"].append(
{
"InputName": "post_processor_script",
"S3Input": {
"S3Uri": props["PostAnalyticsProcessorSourceUri"],
"LocalPath": "/opt/ml/processing/code/postprocessing",
"S3DataType": "S3Prefix",
"S3InputMode": "File",
"S3DataDistributionType": "FullyReplicated",
"S3CompressionType": "None",
},
}
)
# If this is an update and we have previous baseline & constraints uri add these as inputs
data = event.get("CrHelperData")
if event["RequestType"] == "Update" and data != None:
# Add baseline constraints
logger.debug("Update with constraints: %s", data["BaselineConstraintsUri"])
env = request["Environment"]
env["baseline_constraints"] = "/opt/ml/processing/baseline/constraints/constraints.json"
request["ProcessingInputs"].append(
{
"InputName": "constraints",
"S3Input": {
"S3Uri": data["BaselineConstraintsUri"],
"LocalPath": "/opt/ml/processing/baseline/constraints",
"S3DataType": "S3Prefix",
"S3InputMode": "File",
"S3DataDistributionType": "FullyReplicated",
"S3CompressionType": "None",
},
}
)
# Add baseline statistics
logger.debug("Update with statistics: %s", data["BaselineStatisticsUri"])
env["baseline_statistics"] = "/opt/ml/processing/baseline/stats/statistics.json"
request["ProcessingInputs"].append(
{
"InputName": "baseline",
"S3Input": {
"S3Uri": data["BaselineStatisticsUri"],
"LocalPath": "/opt/ml/processing/baseline/stats",
"S3DataType": "S3Prefix",
"S3InputMode": "File",
"S3DataDistributionType": "FullyReplicated",
"S3CompressionType": "None",
},
}
)
# Build the constraints and statistics URI from the results URI
constraints_uri = props["BaselineResultsUri"] + "/constraints.json"
statistics_uri = props["BaselineResultsUri"] + "/statistics.json"
return request, constraints_uri, statistics_uri