in src/demo_data_quality_model_monitor.py [0:0]
def create_data_quality_monitor(self):
reports_prefix = '{}/data_quality/reports'.format(self.prefix)
self.data_quality_s3_report_path = 's3://{}/{}'.format(self.bucket, reports_prefix)
data_quality_prefix = self.prefix + '/data_quality'
data_quality_baseline_prefix = data_quality_prefix + '/baselining'
data_quality_baseline_results_prefix = data_quality_baseline_prefix + '/results'
self.data_quality_baseline_results_uri = 's3://{}/{}'.format(self.bucket, data_quality_baseline_results_prefix)
print('Baseline results uri: {}'.format(self.data_quality_baseline_results_uri))
my_default_monitor = DefaultModelMonitor(
role=self.role,
instance_count=1,
instance_type='ml.m5.xlarge',
volume_size_in_gb=20,
max_runtime_in_seconds=3600,
network_config=self.network_config,
tags=self.tags,
**self.kms_kwargs
)
my_default_monitor.suggest_baseline(
baseline_dataset=self.training_dataset_path,
dataset_format=DatasetFormat.csv(header=True),
output_s3_uri=self.data_quality_baseline_results_uri,
wait=True
)
my_default_monitor._validate_network_config = lambda network_config_dict: None
mon_schedule_name = self.endpoint_name[:60] + '-dq'
try:
self.sm_client.delete_monitoring_schedule(MonitoringScheduleName=mon_schedule_name)
except Exception as e:
pass
my_default_monitor.create_monitoring_schedule(
monitor_schedule_name=mon_schedule_name,
endpoint_input=self.endpoint_name,
output_s3_uri=self.data_quality_s3_report_path,
statistics=my_default_monitor.baseline_statistics(),
constraints=my_default_monitor.suggested_constraints(),
schedule_cron_expression=CronExpressionGenerator.hourly(),
record_preprocessor_script=self.record_preprocessor_script,
post_analytics_processor_script=self.post_analytics_processor_script,
enable_cloudwatch_metrics=True,
)
print(mon_schedule_name)
return my_default_monitor