def monitor()

in google_cloud_automlops/AutoMLOps.py [0:0]


def monitor(
    target_field: str,
    model_endpoint: str,
    alert_emails: Optional[list] = None,
    auto_retraining_params: Optional[dict] = None,
    drift_thresholds: Optional[dict] = None,
    hide_warnings: Optional[bool] = True,
    job_display_name: Optional[str] = None,
    monitoring_interval: Optional[int] = 1,
    monitoring_location: Optional[str] = DEFAULT_RESOURCE_LOCATION,
    sample_rate: Optional[float] = 0.8,
    skew_thresholds: Optional[dict] = None,
    training_dataset: Optional[str] = None):
    """Creates or updates a Vertex AI Model Monitoring Job for a deployed model endpoint.
        - The predicted target field and model endpoint are required.
        - alert_emails, if specified, will send monitoring updates to the specified email(s)
        - auto_retraining_params will set up automatic retraining by creating a Log Sink and
            forwarding anomaly logs to the Pub/Sub Topic for retraining the model with the params
            specified here. If this field is left Null, the model will not be automatically
            retrained when an anomaly is detected.
        - drift_thresholds and skew_thresholds are optional, but at least 1 of them must be specified.
        - training_dataset must be specified if skew_thresholds are provided.
    Defaults are stored in config/defaults.yaml.

    Args:
        target_field: Prediction target column name in training dataset.
        model_endpoint: Endpoint resource name of the deployed model to monitoring.
            Format: projects/{project}/locations/{location}/endpoints/{endpoint}
        alert_emails: Optional list of emails to send monitoring alerts.
            Email alerts not used if this value is set to None.
        auto_retraining_params: Pipeline parameter values to use when retraining the model.
            Defaults to None; if left None, the model will not be retrained if an alert is generated.
        drift_thresholds: Compares incoming data to data previously seen to check for drift.
        hide_warnings: Boolean that specifies whether to show permissions warnings before monitoring.
        job_display_name: Display name of the ModelDeploymentMonitoringJob. The name can be up to 128 characters 
            long and can be consist of any UTF-8 characters.
        monitoring_interval: Configures model monitoring job scheduling interval in hours.
            This defines how often the monitoring jobs are triggered.
        monitoring_location: Location to retrieve ModelDeploymentMonitoringJob from.
        sample_rate: Used for drift detection, specifies what percent of requests to the endpoint are randomly sampled
            for drift detection analysis. This value most range between (0, 1].
        skew_thresholds: Compares incoming data to the training dataset to check for skew.
        training_dataset: Training dataset used to train the deployed model. This field is required if
            using skew detection.
    """
    if not skew_thresholds and not drift_thresholds:
        raise ValueError('skew_thresolds and drift_thresholds cannot both be None.')
    elif skew_thresholds and not training_dataset:
        raise ValueError('training_dataset must be set to use skew_thresolds.')

    defaults = read_yaml_file(GENERATED_DEFAULTS_FILE)
    if not defaults['gcp']['setup_model_monitoring']:
        raise ValueError('Parameter setup_model_monitoring in .generate() must be set to True to use .monitor()')
    if not hide_warnings:
        account_permissions_warning(operation='model_monitoring', defaults=defaults)

    derived_job_display_name = f'''{defaults['gcp']['naming_prefix']}-model-monitoring-job''' if job_display_name is None else job_display_name
    derived_log_sink_name = f'''{defaults['gcp']['naming_prefix']}-model-monitoring-log-sink'''
    defaults['monitoring']['target_field'] = target_field
    defaults['monitoring']['model_endpoint'] = model_endpoint
    defaults['monitoring']['alert_emails'] = alert_emails
    defaults['monitoring']['auto_retraining_params'] = auto_retraining_params
    defaults['monitoring']['drift_thresholds'] = drift_thresholds
    defaults['monitoring']['gs_auto_retraining_params_path'] = f'''gs://{defaults['gcp']['storage_bucket_name']}/pipeline_root/{defaults['gcp']['naming_prefix']}/automatic_retraining_parameters.json'''
    defaults['monitoring']['job_display_name'] = derived_job_display_name
    defaults['monitoring']['log_sink_name'] = derived_log_sink_name
    defaults['monitoring']['monitoring_interval'] = monitoring_interval
    defaults['monitoring']['monitoring_location'] = monitoring_location
    defaults['monitoring']['sample_rate'] = sample_rate
    defaults['monitoring']['skew_thresholds'] = skew_thresholds
    defaults['monitoring']['training_dataset'] = training_dataset

    write_file(GENERATED_DEFAULTS_FILE, DEFAULTS_HEADER, 'w')
    write_yaml_file(GENERATED_DEFAULTS_FILE, defaults, 'a')

    os.chdir(BASE_DIR)
    try:
        subprocess.run(['./scripts/create_model_monitoring_job.sh'], shell=True,
                        check=True, stderr=subprocess.STDOUT)
    except subprocess.CalledProcessError as e:
        logging.info(e) # graceful error exit to allow for cd-ing back
    os.chdir('../')