def attach()

in src/sagemaker/model_monitor/model_monitoring.py [0:0]


    def attach(cls, monitor_schedule_name, sagemaker_session=None):
        """Set this object's schedule name point to the Amazon Sagemaker Monitoring Schedule name.

        This allows subsequent describe_schedule or list_executions calls to point
        to the given schedule.

        Args:
            monitor_schedule_name (str): The name of the schedule to attach to.
            sagemaker_session (sagemaker.session.Session): Session object which
                manages interactions with Amazon SageMaker APIs and any other
                AWS services needed. If not specified, one is created using
                the default AWS configuration chain.

        """
        sagemaker_session = sagemaker_session or Session()
        schedule_desc = sagemaker_session.describe_monitoring_schedule(
            monitoring_schedule_name=monitor_schedule_name
        )

        monitoring_job_definition = schedule_desc["MonitoringScheduleConfig"][
            "MonitoringJobDefinition"
        ]
        role = monitoring_job_definition["RoleArn"]
        image_uri = monitoring_job_definition["MonitoringAppSpecification"].get("ImageUri")
        cluster_config = monitoring_job_definition["MonitoringResources"]["ClusterConfig"]
        instance_count = cluster_config.get("InstanceCount")
        instance_type = cluster_config["InstanceType"]
        volume_size_in_gb = cluster_config["VolumeSizeInGB"]
        volume_kms_key = cluster_config.get("VolumeKmsKeyId")
        entrypoint = monitoring_job_definition["MonitoringAppSpecification"].get(
            "ContainerEntrypoint"
        )
        output_kms_key = monitoring_job_definition["MonitoringOutputConfig"].get("KmsKeyId")
        network_config_dict = monitoring_job_definition.get("NetworkConfig")

        max_runtime_in_seconds = None
        stopping_condition = monitoring_job_definition.get("StoppingCondition")
        if stopping_condition:
            max_runtime_in_seconds = stopping_condition.get("MaxRuntimeInSeconds")

        env = monitoring_job_definition.get("Environment", None)

        vpc_config = None
        if network_config_dict:
            vpc_config = network_config_dict.get("VpcConfig")

        security_group_ids = None
        if vpc_config:
            security_group_ids = vpc_config["SecurityGroupIds"]

        subnets = None
        if vpc_config:
            subnets = vpc_config["Subnets"]

        network_config = None
        if network_config_dict:
            network_config = NetworkConfig(
                enable_network_isolation=network_config_dict["EnableNetworkIsolation"],
                encrypt_inter_container_traffic=network_config_dict[
                    "EnableInterContainerTrafficEncryption"
                ],
                security_group_ids=security_group_ids,
                subnets=subnets,
            )

        tags = sagemaker_session.list_tags(resource_arn=schedule_desc["MonitoringScheduleArn"])

        attached_monitor = cls(
            role=role,
            image_uri=image_uri,
            instance_count=instance_count,
            instance_type=instance_type,
            entrypoint=entrypoint,
            volume_size_in_gb=volume_size_in_gb,
            volume_kms_key=volume_kms_key,
            output_kms_key=output_kms_key,
            max_runtime_in_seconds=max_runtime_in_seconds,
            sagemaker_session=sagemaker_session,
            env=env,
            tags=tags,
            network_config=network_config,
        )
        attached_monitor.monitoring_schedule_name = monitor_schedule_name
        return attached_monitor