in source/lib/blueprints/byom/model_monitor.py [0:0]
def __init__(self, scope: core.Construct, id: str, monitoring_type: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
# validate the provided monitoring_type
if monitoring_type not in ["DataQuality", "ModelQuality", "ModelBias", "ModelExplainability"]:
raise ValueError(
(
f"The {monitoring_type} is not valid. Supported Monitoring Types are: "
f"'DataQuality'|'ModelQuality'|'ModelBias'|'ModelExplainability'"
)
)
# Baseline/Monitor attributes, this will be updated based on the monitoring_type
self.baseline_attributes = dict()
self.monitor_attributes = dict()
# Parameteres #
self.monitoring_type = monitoring_type
self.blueprint_bucket_name = pf.create_blueprint_bucket_name_parameter(self)
self.assets_bucket_name = pf.create_assets_bucket_name_parameter(self)
self.endpoint_name = pf.create_endpoint_name_parameter(self)
self.baseline_job_output_location = pf.create_baseline_job_output_location_parameter(self)
self.baseline_data = pf.create_baseline_data_parameter(self)
self.instance_type = pf.create_instance_type_parameter(self)
self.instance_count = pf.create_instance_count_parameter(self)
self.instance_volume_size = pf.create_instance_volume_size_parameter(self)
self.baseline_max_runtime_seconds = pf.create_baseline_max_runtime_seconds_parameter(self)
self.monitor_max_runtime_seconds = pf.create_monitor_max_runtime_seconds_parameter(self, "ModelQuality")
self.kms_key_arn = pf.create_kms_key_arn_parameter(self)
self.baseline_job_name = pf.create_baseline_job_name_parameter(self)
self.monitoring_schedule_name = pf.create_monitoring_schedule_name_parameter(self)
self.data_capture_bucket = pf.create_data_capture_bucket_name_parameter(self)
self.baseline_output_bucket = pf.create_baseline_output_bucket_name_parameter(self)
self.data_capture_s3_location = pf.create_data_capture_location_parameter(self)
self.monitoring_output_location = pf.create_monitoring_output_location_parameter(self)
self.schedule_expression = pf.create_schedule_expression_parameter(self)
self.image_uri = pf.create_algorithm_image_uri_parameter(self)
# common conditions
self.kms_key_arn_provided = cf.create_kms_key_arn_provided_condition(self, self.kms_key_arn)
# Resources #
self.assets_bucket = s3.Bucket.from_bucket_name(
self, "ImportedAssetsBucket", self.assets_bucket_name.value_as_string
)
# getting blueprint bucket object from its name - will be used later in the stack
self.blueprint_bucket = s3.Bucket.from_bucket_name(
self, "ImportedBlueprintBucket", self.blueprint_bucket_name.value_as_string
)
# update common Baseline attributes
self._update_common_baseline_attributes()
# add ModelQuality specific parameters/conditions, and update self.baseline_attributes/self.monitor_attributes
if self.monitoring_type in ["ModelQuality", "ModelBias", "ModelExplainability"]:
self._add_model_quality_resources()
# add extra ModelBias/ModelExplainability
if self.monitoring_type in ["ModelBias", "ModelExplainability"]:
self._add_model_bias_explainability_extra_attributes()
# create custom resource to invoke the baseline job lambda
invoke_lambda_custom_resource = self._create_invoke_lambda_custom_resource()
# creating SageMaker monitor role
self.sagemaker_role = self._create_sagemaker_monitor_role()
# update attributes
self._update_common_monitor_attributes()
# create SageMaker monitoring Schedule
sagemaker_monitor = SageMakerModelMonitor(self, f"{monitoring_type}Monitor", **self.monitor_attributes)
# add job definition dependency on sagemaker role and invoke_lambda_custom_resource
# (so, the baseline job is created)
sagemaker_monitor.job_definition.node.add_dependency(self.sagemaker_role)
sagemaker_monitor.job_definition.node.add_dependency(invoke_lambda_custom_resource)
# Outputs #
self._create_stack_outputs()