def attach()

in src/sagemaker/model_monitor/model_monitoring.py [0:0]


    def attach(cls, monitor_schedule_name, sagemaker_session=None):
        """Sets this object's schedule name to the name provided.

        This allows subsequent describe_schedule or list_executions calls to point
        to the given schedule.

        Args:
            monitor_schedule_name (str): The name of the schedule to attach to.
            sagemaker_session (sagemaker.session.Session): Session object which
                manages interactions with Amazon SageMaker APIs and any other
                AWS services needed. If not specified, one is created using
                the default AWS configuration chain.
        """
        sagemaker_session = sagemaker_session or Session()
        schedule_desc = sagemaker_session.describe_monitoring_schedule(
            monitoring_schedule_name=monitor_schedule_name
        )

        job_definition_name = schedule_desc["MonitoringScheduleConfig"].get(
            "MonitoringJobDefinitionName"
        )
        if job_definition_name:
            monitoring_type = schedule_desc["MonitoringScheduleConfig"].get("MonitoringType")
            if monitoring_type != cls.monitoring_type():
                raise TypeError(
                    "{} can only attach to Data quality monitoring schedule.".format(
                        __class__.__name__
                    )
                )
            job_desc = sagemaker_session.sagemaker_client.describe_data_quality_job_definition(
                JobDefinitionName=job_definition_name
            )
            tags = sagemaker_session.list_tags(resource_arn=schedule_desc["MonitoringScheduleArn"])

            return ModelMonitor._attach(
                clazz=cls,
                sagemaker_session=sagemaker_session,
                schedule_desc=schedule_desc,
                job_desc=job_desc,
                tags=tags,
            )

        job_definition = schedule_desc["MonitoringScheduleConfig"]["MonitoringJobDefinition"]
        role = job_definition["RoleArn"]
        cluster_config = job_definition["MonitoringResources"]["ClusterConfig"]
        instance_count = cluster_config["InstanceCount"]
        instance_type = cluster_config["InstanceType"]
        volume_size_in_gb = cluster_config["VolumeSizeInGB"]
        volume_kms_key = cluster_config.get("VolumeKmsKeyId")
        output_kms_key = job_definition["MonitoringOutputConfig"].get("KmsKeyId")
        max_runtime_in_seconds = job_definition.get("StoppingCondition", {}).get(
            "MaxRuntimeInSeconds"
        )
        env = job_definition["Environment"]

        network_config_dict = job_definition.get("NetworkConfig", {})
        network_config = None
        if network_config_dict:
            vpc_config = network_config_dict.get("VpcConfig", {})
            security_group_ids = vpc_config.get("SecurityGroupIds")
            subnets = vpc_config.get("Subnets")
            network_config = NetworkConfig(
                enable_network_isolation=network_config_dict["EnableNetworkIsolation"],
                encrypt_inter_container_traffic=network_config_dict[
                    "EnableInterContainerTrafficEncryption"
                ],
                security_group_ids=security_group_ids,
                subnets=subnets,
            )

        tags = sagemaker_session.list_tags(resource_arn=schedule_desc["MonitoringScheduleArn"])

        attached_monitor = cls(
            role=role,
            instance_count=instance_count,
            instance_type=instance_type,
            volume_size_in_gb=volume_size_in_gb,
            volume_kms_key=volume_kms_key,
            output_kms_key=output_kms_key,
            max_runtime_in_seconds=max_runtime_in_seconds,
            sagemaker_session=sagemaker_session,
            env=env,
            tags=tags,
            network_config=network_config,
        )
        attached_monitor.monitoring_schedule_name = monitor_schedule_name
        return attached_monitor