python/pipelines/components/vertex/component.py (271 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional
import os, logging, yaml, toml
from kfp.dsl import component, Output, Artifact, Model, Input, Metrics, ClassificationMetrics, Dataset
from ma_components.vertex import VertexModel
pyproject_toml_file_path = os.path.join(os.path.dirname(__file__), '../../../../pyproject.toml')
config_file_path = os.path.join(os.path.dirname(__file__), '../../../../config/config.yaml')
packages_to_install = []
if os.path.exists(pyproject_toml_file_path):
dependencies = toml.load(pyproject_toml_file_path)['tool']['poetry']['group']['component_vertex']['dependencies']
for k,v in dependencies.items():
packages_to_install.append(f"{k}=={v}")
target_image=None
if os.path.exists(config_file_path):
with open(config_file_path, encoding='utf-8') as fh:
configs = yaml.full_load(fh)
vertex_components_params = configs['vertex_ai']['components']
repo_params = configs['artifact_registry']['pipelines_docker_repo']
#target_image = f"{repo_params['region']}-docker.pkg.dev/{repo_params['project_id']}/{repo_params['name']}/{vertex_components_params['image_name']}:{vertex_components_params['tag']}"
base_image = f"{repo_params['region']}-docker.pkg.dev/{repo_params['project_id']}/{repo_params['name']}/{vertex_components_params['base_image_name']}:{vertex_components_params['base_image_tag']}"
@component(
base_image=base_image,
#target_image=target_image,
#packages_to_install=packages_to_install
)
def elect_best_tabular_model(
project: str,
location: str,
display_name: str,
metric_name: str,
metric_threshold: float,
number_of_models_considered: int,
metrics_logger: Output[Metrics],
classification_metrics_logger: Output[ClassificationMetrics],
elected_model: Output[VertexModel]
) -> None:
"""Vertex pipelines component that elects the best model based on some
criteria such as metric, minimum accepted threshold and number of
last models to compare. The compoenent uses google-cloud-aiplatform Model.get_model_evaluation()
to retrieve evaluation results. It compares against models with the same name or different versions within
the same model. For models with multiple versions, the best version becomes the default version.
Args:
project (str):
Project to retrieve models and model registry from
location (str):
Location to retrieve models and model registry from
display_name (str):
The display name of the model for which selection is going to be made
metric_name (str):
The name of the metic based on which the model will be evaluated
metric_threshold (float):
The minimum or maximum (depended on metrci) accepted value of the selected metric. If the metric value is below or above this threashold, the model will not be selected. For logLoss this value should be max accepted logLoss. For auROC and auPR this value should be minumum accepted.
number_of_models_considered (int):
Defines the number of latest models to be considered for selection. If you always want the last model then this value should be 1
Returns:
Raises:
ValueError: given metric_goal not a valid MetricGoal
ValueError: Model evaluation metric {metric_name} of value {best_model[metric_name]} does not meet minumum criteria of threshold {metric_threshold}
"""
from google.cloud import aiplatform as aip
import logging
from pprint import pformat
from enum import Enum
#from google_cloud_pipeline_components.types.artifact_types import VertexModel
class MetricsEnum(Enum):
# classification
LOG_LOSS = 'logLoss'
AU_ROC = 'auRoc'
AU_PRC = 'auPrc'
# regression
MAE = 'meanAbsoluteError'
MAPE = 'meanAbsolutePercentageError'
RMSE = 'rootMeanSquaredError'
RMSLE = 'rootMeanSquaredLogError'
R2 = 'rSquared'
def is_new_metric_better(self, new_value: float, old_value: float):
return new_value<old_value if self.name in [MetricsEnum.LOG_LOSS.name, MetricsEnum.MAE.name, MetricsEnum.MAPE.name, MetricsEnum.RMSE.name, MetricsEnum.RMSLE.name] else new_value>old_value
@classmethod
def list(cls):
return list(map(lambda c: c.value, cls))
logging.info(display_name)
aip.init(project=project, location=location)
models = aip.Model.list(filter=f'display_name="{display_name}"')
models_versions_to_compare = []
# find the X (number_of_models_considered) latest models based on created date of models and versions
counter = 0
for model in models:
model_registry = aip.ModelRegistry(model=model.name)
for v in model_registry.list_versions():
if(counter<number_of_models_considered):
models_versions_to_compare.append(v)
counter+=1
else:
canditate_v = v
for idx, mv in enumerate(models_versions_to_compare):
if canditate_v.version_create_time.timestamp() > mv.version_create_time.timestamp(): # checks if current canditate is newer than one already in list
tmp = mv
models_versions_to_compare[idx] = canditate_v
canditate_v = tmp
if len(models_versions_to_compare)==0:
raise Exception(f"No models in vertex model registry match '{display_name}'")
best_model = dict()
best_eval_metrics = dict()
for model_version in models_versions_to_compare:
logging.info(f"{model_version.model_resource_name} @ {model_version.version_id}")
model = aip.Model(model_name=f"{model_version.model_resource_name}@{model_version.version_id}")
evaluation = model.get_model_evaluation() # retruns data from latest evaluation job
if (metric_name not in best_model) or MetricsEnum(metric_name).is_new_metric_better(evaluation.metrics[metric_name], best_model[metric_name]):
best_eval_metrics = evaluation.metrics
best_model[metric_name] = best_eval_metrics[metric_name]
best_model["resource_name"] = model.resource_name
best_model["display_name"] = model.display_name
best_model["version"] = model.version_id
logging.info(f"New Model/Version elected | name: {model.resource_name} | version {model.version_id} | metric name: {metric_name} | metric value: {best_eval_metrics[metric_name]} ")
if MetricsEnum(metric_name).is_new_metric_better(metric_threshold, best_model[metric_name]):
raise ValueError(f"Model evaluation metric {metric_name} of value {best_model[metric_name]} does not meet minumum criteria of threshold {metric_threshold}")
#make the best version of the best model as default
aip.ModelRegistry(model=best_model["resource_name"]).add_version_aliases(['default'], best_model["version"])
#fpr_arr = []
#tpr_arr = []
#th_arr = []
for k,v in best_eval_metrics.items():
if k in MetricsEnum.list():
logging.info(f"Metrics Logger: Model Evaluation Metric name {k} and value {v}")
def _isnan(value):
try:
import math
return math.isnan(float(value))
except:
return False
if _isnan(v):
v = "0"
metrics_logger.log_metric(k, v)
elif k == 'confidenceMetrics':
for confidence_metrics in v:
if 'confidenceThreshold' in confidence_metrics :
if confidence_metrics['confidenceThreshold']>=0 and confidence_metrics['confidenceThreshold']<=1:
classification_metrics_logger.log_roc_data_point(
float(confidence_metrics['falsePositiveRate']) if 'falsePositiveRate' in confidence_metrics else 0,
float(confidence_metrics['recall']) if 'recall' in confidence_metrics else 0,
float(confidence_metrics['confidenceThreshold'])
)
if 'confusionMatrix' in confidence_metrics and (confidence_metrics['confidenceThreshold']>0.48 or confidence_metrics['confidenceThreshold']<0.52):
confusion_m = confidence_metrics['confusionMatrix']
classification_metrics_logger.log_confusion_matrix(
[v['displayName'] for v in confusion_m['annotationSpecs']],
confusion_m['rows'])
#th_arr.append(float(confidence_metrics['confidenceThreshold']))
#fpr_arr.append(float(confidence_metrics['falsePositiveRate']) if 'falsePositiveRate' in confidence_metrics else 0)
#tpr_arr.append(float(confidence_metrics['recall']) if 'recall' in confidence_metrics else 0)
#elected_model.name=best_model["display_name"]
elected_model.uri = f"https://{location}-aiplatform.googleapis.com/v1/{best_model['resource_name']}/versions/{best_model['version']}"
elected_model.metadata = {
'resourceName': best_model["resource_name"],
'version': best_model["version"],
}
elected_model.schema_title = 'google.VertexModel'
#classification_metrics_logger.log_roc_curve(fpr_arr,tpr_arr, th_arr)
@component(
base_image=base_image,
#target_image=target_image,
#packages_to_install=packages_to_install
)
def get_latest_model(
project: str,
location: str,
display_name: str,
elected_model: Output[VertexModel]
) -> None:
"""Vertex pipelines component that elects the latest model based on the display name.
Args:
project (str):
Project to retrieve models and model registry from
location (str):
Location to retrieve models and model registry from
display_name (str):
The display name of the model for which selection is going to be made
elected_model: Output[VertexModel]:
The output VertexModel object containing the latest model information.
Raises:
Exception: If no models are found in the vertex model registry that match the display name.
"""
from google.cloud import aiplatform as aip
import logging
from pprint import pformat
from enum import Enum
#from google_cloud_pipeline_components.types.artifact_types import VertexModel
class MetricsEnum(Enum):
LOG_LOSS = 'logLoss'
AU_ROC = 'auRoc'
AU_PRC = 'auPrc'
def is_new_metric_better(self, new_value: float, old_value: float):
return new_value<old_value if self.name == MetricsEnum.LOG_LOSS.name else new_value>old_value
@classmethod
def list(cls):
return list(map(lambda c: c.value, cls))
number_of_models_considered: int = 1
logging.info(display_name)
aip.init(project=project, location=location)
models = aip.Model.list(filter=f'display_name="{display_name}"', order_by=f"create_time desc")
models_versions_to_compare = []
# find the X (number_of_models_considered) latest models based on created date of models and versions
counter = 0
for model in models:
model_registry = aip.ModelRegistry(model=model.name)
for v in model_registry.list_versions():
if counter < number_of_models_considered:
models_versions_to_compare.append(v)
counter += 1
else:
canditate_v = v
for idx, mv in enumerate(models_versions_to_compare):
if canditate_v.version_create_time.timestamp() > mv.version_create_time.timestamp(): # checks if current canditate is newer than one already in list
tmp = mv
models_versions_to_compare[idx] = canditate_v
canditate_v = tmp
if len(models_versions_to_compare) == 0:
raise Exception(f"No models in vertex model registry match '{display_name}'")
model = models_versions_to_compare[0]
logging.info(f"Selected model : {model}")
aip.ModelRegistry(model=model.model_resource_name).add_version_aliases(['default'], model.version_id)
elected_model.uri = f"https://{location}-aiplatform.googleapis.com/v1/{model.model_resource_name}/versions/{model.version_id}"
elected_model.metadata = {
'resourceName': model.model_resource_name,
'version': model.version_id,
}
elected_model.schema_title = 'google.VertexModel'
@component(base_image=base_image)
def batch_prediction(
destination_table: Output[Dataset],
bigquery_source: str,
bigquery_destination_prefix: str,
job_name_prefix: str,
model: Input[VertexModel],
machine_type: str = "n1-standard-2",
max_replica_count: int = 10,
batch_size: int = 64,
accelerator_count: int = 0,
accelerator_type: str = None,
generate_explanation: bool = False,
dst_table_expiration_hours: int = 0
):
"""Vertex pipelines component that performs batch prediction using a Vertex AI model.
Args:
destination_table (Output[Dataset]):
The output BigQuery table where the predictions will be stored.
bigquery_source (str):
The BigQuery table containing the data to be predicted.
bigquery_destination_prefix (str):
The BigQuery table prefix where the predictions will be stored.
job_name_prefix (str):
The prefix for the batch prediction job name.
model (Input[VertexModel]):
The Vertex AI model to be used for prediction.
machine_type (str, optional):
The machine type to use for the batch prediction job. Defaults to "n1-standard-2".
max_replica_count (int, optional):
The maximum number of replicas to use for the batch prediction job. Defaults to 10.
batch_size (int, optional):
The batch size to use for the batch prediction job. Defaults to 64.
accelerator_count (int, optional):
The number of accelerators to use for the batch prediction job. Defaults to 0.
accelerator_type (str, optional):
The type of accelerators to use for the batch prediction job. Defaults to None.
generate_explanation (bool, optional):
Whether to generate explanations for the predictions. Defaults to False.
dst_table_expiration_hours (int, optional):
The number of hours after which the destination table will expire. Defaults to 0.
Raises:
Exception: If the batch prediction job fails.
"""
from datetime import datetime, timedelta, timezone
import logging
from google.cloud import bigquery
from google.cloud.aiplatform import Model
model = Model(f"{model.metadata['resourceName']}@{model.metadata['version']}")
timestamp = str(int(datetime.now().timestamp()))
batch_prediction_job = model.batch_predict(
job_display_name=f"{job_name_prefix}-{timestamp}",
instances_format="bigquery",
predictions_format="bigquery",
bigquery_source=f"bq://{bigquery_source}",
bigquery_destination_prefix=f"bq://{bigquery_destination_prefix}",
machine_type=machine_type,
max_replica_count=max_replica_count,
batch_size=batch_size,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
generate_explanation=generate_explanation
)
batch_prediction_job.wait()
# Filling the destination_table with the bigquery destination table.
destination_table.metadata["table_id"] = f"{batch_prediction_job.to_dict()['outputInfo']['bigqueryOutputDataset'].replace('bq://','')}.{batch_prediction_job.to_dict()['outputInfo']['bigqueryOutputTable']}"
destination_table.metadata["predictions_column_prefix"] = "predicted_"
destination_table.metadata["predictions_column"] = "prediction"
destination_table.metadata["predictions_prob_column"] = "prediction_prob"
if dst_table_expiration_hours > 0:
client = bigquery.Client(project=model.project)
table = client.get_table(destination_table.metadata["table_id"])
expiration = datetime.now(timezone.utc) + timedelta(
hours=dst_table_expiration_hours
)
table.expires = expiration
client.update_table(table, ["expires"])
logging.info(batch_prediction_job.to_dict())
@component(base_image=base_image)
# Note currently KFP SDK doesn't support outputting artifacts in `google` namespace.
# Use the base type dsl.Artifact instead.
def return_unmanaged_model(
image_uri: str,
bucket_name: str,
model_name: str,
model: Output[Artifact]
) -> None:
"""Vertex pipelines component that returns an unmanaged model artifact.
Args:
image_uri (str):
The URI of the container image for the unmanaged model.
bucket_name (str):
The name of the Google Cloud Storage bucket where the unmanaged model is stored.
model_name (str):
The name of the unmanaged model file in the Google Cloud Storage bucket.
model (Output[Artifact]):
The output VertexModel artifact.
Raises:
Exception: If the model artifact cannot be created.
"""
from google_cloud_pipeline_components import v1
from google_cloud_pipeline_components.types import artifact_types
from kfp import dsl
model_uri = f"gs://{bucket_name}/{model_name}"
model.metadata['containerSpec'] = {
'imageUri':
f"{image_uri}"
}
model.uri = model_uri
# Create tabular model explanation component
@component(base_image=base_image)
def get_tabular_model_explanation(
project: str,
location: str,
model: Input[VertexModel],
model_explanation: Output[Dataset],
) -> None:
"""Vertex pipelines component that retrieves tabular model explanations from the AutoML API.
Args:
project (str):
Project to retrieve models and model registry from
location (str):
Location to retrieve models and model registry from
model (Input[VertexModel]):
The Vertex AI model for which explanations will be retrieved.
model_explanation (Output[Dataset]):
The output BigQuery dataset where the model explanations will be stored.
Raises:
Exception: If the model explanations cannot be retrieved.
"""
from google.cloud import aiplatform
import logging
import re
#Get explanaitions from the AutoML API
aiplatform.init(project=project, location=location)
model = aiplatform.Model(model.metadata["resourceName"]) # exmaple: 'projects/mde-aggregated-vbb/locations/us-central1/models/1391715638950494208' # replace with your model id
model_evals = model.api_client.select_version('v1beta1').list_model_evaluations(parent=model.resource_name)
#Get model id
model_id = model.resource_name.split('/')[-1]
#Get model name
model_name = model.display_name
#Get model version
model_version = model.version_id
logging.info(model_version)
# convert the pager format into lists containing Json
modelEvalJson= []
for val in model_evals:
#print(val.model_explanation)
modelEvalJson.append(val.model_explanation)
modelEvalJson = modelEvalJson[0] # keep only the json data
# Filter the API response pager to keep only keys as feature_name and values as the coefficients
# Extract values using regular expressions in 2 lists
feature_names = re.findall(r'(?<=key: ").*?(?=")', str(modelEvalJson))
values = re.findall(r"(?<=number_value: )[0-9]+\.[0-9]+", str(modelEvalJson))
#DEBUG PRINT: print the extracted feature names and values
logging.info(feature_names)
logging.info(values)
# Format data as rows for BigQuery insertion
rows_to_insert = [
dict(zip(['feature_names', 'values'], row))
for row in zip(feature_names, values)
]
# Component output
model_explanation.metadata = {
'model_id': model_id,
'model_name': model_name,
'model_version': model_version,
'model_uri': f"https://{location}-aiplatform.googleapis.com/v1/{model.resource_name}/versions/{model.version_id}",
'feature_names': feature_names,
'values': values,
}