in src/sagemaker/model_monitor/model_monitoring.py [0:0]
def attach(cls, monitor_schedule_name, sagemaker_session=None):
"""Sets this object's schedule name to the name provided.
This allows subsequent describe_schedule or list_executions calls to point
to the given schedule.
Args:
monitor_schedule_name (str): The name of the schedule to attach to.
sagemaker_session (sagemaker.session.Session): Session object which
manages interactions with Amazon SageMaker APIs and any other
AWS services needed. If not specified, one is created using
the default AWS configuration chain.
"""
sagemaker_session = sagemaker_session or Session()
schedule_desc = sagemaker_session.describe_monitoring_schedule(
monitoring_schedule_name=monitor_schedule_name
)
job_definition_name = schedule_desc["MonitoringScheduleConfig"].get(
"MonitoringJobDefinitionName"
)
if job_definition_name:
monitoring_type = schedule_desc["MonitoringScheduleConfig"].get("MonitoringType")
if monitoring_type != cls.monitoring_type():
raise TypeError(
"{} can only attach to Data quality monitoring schedule.".format(
__class__.__name__
)
)
job_desc = sagemaker_session.sagemaker_client.describe_data_quality_job_definition(
JobDefinitionName=job_definition_name
)
tags = sagemaker_session.list_tags(resource_arn=schedule_desc["MonitoringScheduleArn"])
return ModelMonitor._attach(
clazz=cls,
sagemaker_session=sagemaker_session,
schedule_desc=schedule_desc,
job_desc=job_desc,
tags=tags,
)
job_definition = schedule_desc["MonitoringScheduleConfig"]["MonitoringJobDefinition"]
role = job_definition["RoleArn"]
cluster_config = job_definition["MonitoringResources"]["ClusterConfig"]
instance_count = cluster_config["InstanceCount"]
instance_type = cluster_config["InstanceType"]
volume_size_in_gb = cluster_config["VolumeSizeInGB"]
volume_kms_key = cluster_config.get("VolumeKmsKeyId")
output_kms_key = job_definition["MonitoringOutputConfig"].get("KmsKeyId")
max_runtime_in_seconds = job_definition.get("StoppingCondition", {}).get(
"MaxRuntimeInSeconds"
)
env = job_definition["Environment"]
network_config_dict = job_definition.get("NetworkConfig", {})
network_config = None
if network_config_dict:
vpc_config = network_config_dict.get("VpcConfig", {})
security_group_ids = vpc_config.get("SecurityGroupIds")
subnets = vpc_config.get("Subnets")
network_config = NetworkConfig(
enable_network_isolation=network_config_dict["EnableNetworkIsolation"],
encrypt_inter_container_traffic=network_config_dict[
"EnableInterContainerTrafficEncryption"
],
security_group_ids=security_group_ids,
subnets=subnets,
)
tags = sagemaker_session.list_tags(resource_arn=schedule_desc["MonitoringScheduleArn"])
attached_monitor = cls(
role=role,
instance_count=instance_count,
instance_type=instance_type,
volume_size_in_gb=volume_size_in_gb,
volume_kms_key=volume_kms_key,
output_kms_key=output_kms_key,
max_runtime_in_seconds=max_runtime_in_seconds,
sagemaker_session=sagemaker_session,
env=env,
tags=tags,
network_config=network_config,
)
attached_monitor.monitoring_schedule_name = monitor_schedule_name
return attached_monitor