tools/vertexai-featurestore-migration-kit/legacy_exporter.py (278 lines of code) (raw):
# -*- coding: utf-8 -*-
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Legacy Feature Store Exporter"""
import os
import csv
from typing import List, Tuple
from datetime import datetime
import yaml
from google.cloud import aiplatform_v1
from logging_config import logger
from utils import create_dataset_if_not_exists, poll_operation, rename_bigquery_column, update_bq_column_descriptions
PROJECT_ROOT = os.path.abspath(os.path.dirname(__file__))
CONFIG_FILE = os.path.join(PROJECT_ROOT, 'config', 'config.yaml')
class LegacyExporter:
"""Legacy Feature Store Exporter"""
def __init__(self):
"""Initialize the LegacyExporter class."""
logger.info("LegacyExporter initialized")
self.config = self.read_config(CONFIG_FILE)
self.project_id = self.config.get("project_id")
self.region = self.config.get("region")
self.dataset_id_prefix = self.config.get("bq_dataset_prefix", "")
self.table_id_prefix = self.config.get("bq_table_prefix", "")
self.feature_store_mode = self.config.get("legacy_feature_store").get("feature_store_mode")
self.client = aiplatform_v1.FeaturestoreServiceClient(
client_options={"api_endpoint": f"{self.region}-aiplatform.googleapis.com"})
@staticmethod
def read_config(config_file):
"""
Read the configuration file.
Args:
config_file (str): The path to the configuration file.
Returns:
dict: The configuration loaded from the file.
"""
with open(config_file, "r", encoding="utf-8") as stream:
config = yaml.safe_load(stream)
return config
def list_features(self, featurestore_id: str, entity_type_id: str) -> List[dict]:
"""
Lists features in a specific entity type within a Vertex AI Featurestore.
Args:
featurestore_id (str): The ID of the Featurestore.
entity_type_id (str): The ID of the entity type within the Featurestore.
Returns:
List[dict]: A list of feature dictionaries, each containing the name, description, and labels of the feature.
"""
parent = self.client.entity_type_path(
project=self.project_id, location=self.region, featurestore=featurestore_id, entity_type=entity_type_id
)
request = aiplatform_v1.ListFeaturesRequest(
parent=parent,
)
page_result = self.client.list_features(request=request)
features = []
for response in page_result:
features.append(
dict(name=response.name.split('/')[-1],
description=response.description,
labels=dict(response.labels))
)
return features
def list_entity_types(self, feature_store_id) -> Tuple[str, List[dict]]:
"""
Loads the list of entity types specified in the configuration or lists all entity types for
a given feature store.
Args:
feature_store_id (str): The ID of the feature store.
Returns:
Tuple[str, List[dict]]: The entity type mode and a list of entity type dictionaries.
"""
logger.debug(f"Listing entity types for FeatureStore: {feature_store_id}")
if self.feature_store_mode == 'all':
entity_type_mode = "all"
elif self.feature_store_mode == 'list':
feature_store = next(
(fs for fs in self.config["legacy_feature_store"]["feature_stores"] if fs["name"] == feature_store_id),
None
)
if not feature_store:
raise ValueError(f"FeatureStore with name '{feature_store_id}' not found in the configuration.")
entity_type_mode = feature_store.get("entity_type_mode", "all")
else:
raise ValueError("Invalid value specified for `feature_store_mode`. "
"Expected one of `all` or `list`")
# List Entity Types
list_entity_type_request = aiplatform_v1.ListEntityTypesRequest(
parent=f"projects/{self.project_id}/locations/{self.region}/featurestores/{feature_store_id}")
list_entity_type_response = self.client.list_entity_types(request=list_entity_type_request)
if entity_type_mode == "list":
filtered_entity_types = [
entity_type
for entity_type in list_entity_type_response
if any(et["name"] == entity_type.name.split('/')[-1] for et in feature_store["entity_types"])
]
elif entity_type_mode == "all":
filtered_entity_types = list_entity_type_response
else:
raise ValueError("Invalid value specified for `entity_type_mode`. "
"Expected one of `all` or `list`")
entity_types = [{
"name": entity_type.name.split('/')[-1],
"description": entity_type.description,
"labels": dict(entity_type.labels)
} for entity_type in filtered_entity_types]
logger.info(f"Entity Types to be migrated for '{feature_store_id}': \n{entity_types}")
return entity_type_mode, entity_types
def get_feature_mapping(self, feature_store_id, entity_type_name, features_list):
"""
Retrieves the feature mapping for a given entity type and feature store.
Args:
feature_store_id (str): The ID of the feature store.
entity_type_name (str): The name of the entity type.
features_list (List[dict]): The list of features.
Returns:
dict or None: The feature mapping, or None if not found.
"""
feature_store = next(
(fs for fs in self.config["legacy_feature_store"]["feature_stores"] if fs["name"] == feature_store_id),
None
)
if feature_store:
entity_type = next(
(et for et in feature_store["entity_types"] if et["name"] == entity_type_name),
None
)
if entity_type:
mapping_file = entity_type.get("mapping_file")
if not mapping_file:
return {}
try:
with open(os.path.join(PROJECT_ROOT, 'config', mapping_file), 'r') as csvfile:
reader = csv.DictReader(csvfile)
mapping_dict = {row['original_feature_name']: row['destination_feature_name'] for row in reader}
original_feature_names = set(mapping_dict.keys())
features_set = set(feature["name"] for feature in features_list)
if not original_feature_names.issubset(features_set):
missing_features = original_feature_names - features_set
raise ValueError(
f"The following feature names are not present in the EntityType"
f" `{entity_type_name}`: {', '.join(missing_features)}")
return mapping_dict
except FileNotFoundError as e:
logger.error(f"Mapping file '{mapping_file}' not found for EntityType '{entity_type['name']}'.")
raise e
else:
raise ValueError(f"Entity type '{entity_type_name}' not found in FeatureStore '{feature_store_id}'.")
else:
raise ValueError(f"FeatureStore '{feature_store_id}' not found in the configuration.")
def list_feature_stores(self, config_feature_list=None):
"""
Lists all the feature stores for a given project.
Args:
config_feature_list (Optional[List[str]]): A list of feature store IDs to filter the results.
Returns:
dict: A dictionary of feature store details, keyed by feature store ID.
"""
list_feature_stores_request = aiplatform_v1.ListFeaturestoresRequest(
parent=f"projects/{self.project_id}/locations/{self.region}"
)
list_feature_store_response = self.client.list_featurestores(list_feature_stores_request)
response = {}
for feature_store in list_feature_store_response:
fs_name = feature_store.name.split('/')[-1]
labels = dict(feature_store.labels)
if config_feature_list and fs_name not in config_feature_list:
continue
online_serving_config = getattr(feature_store, "online_serving_config", None)
if online_serving_config:
if getattr(online_serving_config, "scaling", None):
response[fs_name] = {
"online_store_type": "bigtable",
"labels": labels,
"bigtable_min_node_count": online_serving_config.scaling.min_node_count,
"bigtable_max_node_count": online_serving_config.scaling.max_node_count,
"cpu_utilization_target": online_serving_config.scaling.cpu_utilization_target,
}
elif getattr(online_serving_config, "fixed_node_count", None):
response[fs_name] = {
"online_store_type": "bigtable",
"labels": labels,
"bigtable_min_node_count": online_serving_config.fixed_node_count,
"bigtable_max_node_count": 0,
"cpu_utilization_target": 0,
}
else:
response[fs_name] = {}
else:
response[fs_name] = {}
return response
def fetch_entity_id_column(self, feature_store_id, entity_type_name) -> str:
"""
Fetches the entity ID column for the given feature store and entity type.
Args:
feature_store_id (str): The ID of the feature store.
entity_type_name (str): The name of the entity type.
Returns:
str: The entity ID column name, or an empty string if not found.
"""
feature_stores = self.config["legacy_feature_store"].get("feature_stores")
if not feature_stores:
return ""
column_value = ""
for fs in feature_stores:
if fs["name"] == feature_store_id and fs.get("entity_types"):
for et in fs["entity_types"]:
if et["name"] == entity_type_name:
return et.get("entity_id_column", "")
return column_value
def export_feature_value(
self,
feature_store_id: str,
entity_type: str,
features_list: list[str],
bq_dest: str,
feature_mapping: dict
):
"""
Exports feature values to a BigQuery destination.
Args:
feature_store_id (str): The ID of the feature store.
entity_type (str): The name of the entity type.
features_list (list[str]): The list of feature names to export.
bq_dest (str): The BigQuery destination URI.
feature_mapping (dict): The feature mapping dictionary.
Returns:
The response from the export operation.
"""
dest = aiplatform_v1.FeatureValueDestination()
dest.bigquery_destination.output_uri = bq_dest
feature_selector = aiplatform_v1.FeatureSelector()
feature_selector.id_matcher.ids = features_list
destination_feature_settings = []
if feature_mapping:
for feature, destination in feature_mapping.items():
destination_mapping = aiplatform_v1.DestinationFeatureSetting(feature_id=feature,
destination_field=destination)
destination_feature_settings.append(destination_mapping)
entity_type = self.client.entity_type_path(
project=self.project_id,
location=self.region,
featurestore=feature_store_id,
entity_type=entity_type
)
request = aiplatform_v1.ExportFeatureValuesRequest(
entity_type=entity_type,
destination=dest,
feature_selector=feature_selector,
settings=destination_feature_settings)
request.full_export.end_time = datetime.now().replace(microsecond=0)
logger.info("Exporting feature values")
op = self.client.export_feature_values(request=request)
response = poll_operation(client=self.client, operation_name=op.operation.name)
return response
def _export_individual_feature(self, feature_store_id, entity_type, features_list, feature_mapping, fs_detail):
"""
Exports the features for a given entity type.
Args:
feature_store_id (str): The ID of the feature store.
entity_type (dict): The entity type information.
features_list (list[dict]): The list of features.
feature_mapping (dict): The feature mapping.
fs_detail (dict): The details of the feature store.
Returns:
dict: A dictionary containing the exported entity type information.
"""
entity_type_name = entity_type['name']
dataset_id = self.dataset_id_prefix + feature_store_id
table_id = self.table_id_prefix + entity_type_name
bq_dest_uri = f"bq://{self.project_id}.{dataset_id}.{table_id}"
logger.debug(f"BQ Export Path for '{entity_type_name}': {bq_dest_uri}")
feature_names_list = [feature["name"] for feature in features_list]
self.export_feature_value(feature_store_id=feature_store_id,
entity_type=entity_type_name,
features_list=feature_names_list,
bq_dest=bq_dest_uri,
feature_mapping=feature_mapping)
reserved_keywords = ["entity_id", "feature_timestamp", "arrival_timestamp"]
final_features_list = [feature for feature in features_list if
feature['name'] not in reserved_keywords]
if feature_mapping:
final_features_list = [{**feature, 'name': feature_mapping[feature['name']]}
for feature in final_features_list]
update_bq_column_descriptions(bq_dest_uri,
final_features_list,
entity_type["description"])
entity_id_column = f"entity_type_{entity_type_name}"
dest_column = self.fetch_entity_id_column(feature_store_id, entity_type_name)
if dest_column:
logger.info("Processing entity id column renaming")
rename_bigquery_column(bq_dest_uri, entity_id_column, dest_column)
entity_id_column = dest_column
return {
"entity_type": entity_type,
"features": final_features_list,
"entity_id_column": entity_id_column,
"bq_dest": bq_dest_uri,
"online_store": fs_detail
}
def _export_entity_types(self, feature_store_id, entity_types, entity_type_mode, fs_detail):
"""
Export the entity types for a given feature store.
Args:
feature_store_id (str): The ID of the feature store.
entity_types (list): The list of entity types.
entity_type_mode (str): The mode for entity types ('all' or 'list').
fs_detail (dict): The details of the online feature store config.
Returns:
list: A list of exported entity type results.
"""
exported_entity_type_result = []
for entity_type in entity_types:
entity_type_name = entity_type['name']
logger.info(f"Processing EntityType: {entity_type_name}")
# Get feature mapping, if exists
features_list = self.list_features(feature_store_id, entity_type_name)
if entity_type_mode == "all":
feature_mapping = None
else:
feature_mapping = self.get_feature_mapping(
feature_store_id=feature_store_id,
entity_type_name=entity_type_name,
features_list=features_list)
if feature_mapping:
features_list = [feature for feature in features_list
if feature['name'] in feature_mapping]
if not features_list:
logger.info(f"No Features found under EntityType '{entity_type_name}'")
logger.info(f"Skipping EntityType '{entity_type_name}'")
continue
try:
exported_entity_type_result.append(
self._export_individual_feature(feature_store_id, entity_type, features_list,
feature_mapping, fs_detail)
)
logger.info(f"Exported EntityType '{entity_type_name}' successfully!")
logger.info("-" * 100)
except Exception as exc:
logger.error(f"Failed to export entity_type '{entity_type_name}' with error: {exc}")
raise exc
return exported_entity_type_result
def export_feature_store(self):
"""
Export feature stores from the legacy feature store system.
Returns:
dict: A dictionary containing the export details, including the project ID, region, and exported data for each feature store.
"""
config_feature_store_ids = None
if self.feature_store_mode != 'all':
config_feature_store_ids = [fs["name"] for fs in self.config["legacy_feature_store"]["feature_stores"]]
feature_store_details = self.list_feature_stores(config_feature_list=config_feature_store_ids)
exported_results = {
"project_id": self.project_id,
"region": self.region,
"export_details": {}
}
for feature_store_id, fs_detail in feature_store_details.items():
logger.info(f"Processing FeatureStore: {feature_store_id}")
# Create BQ dataset for export
dataset_id = self.dataset_id_prefix + feature_store_id
create_dataset_if_not_exists(self.project_id, dataset_id)
# Entity types to migrate
entity_type_mode, entity_types = self.list_entity_types(feature_store_id=feature_store_id)
exported_entity_type_result = self._export_entity_types(
feature_store_id, entity_types, entity_type_mode, fs_detail)
exported_results["export_details"][feature_store_id] = exported_entity_type_result
logger.info(f"Successfully exported all EntityTypes for FeatureStore: {feature_store_id}")
logger.info("=" * 100)
logger.info(f"Overall Export Status: \n{exported_results}")
logger.info("*" * 50 + " Export Completed " + "*" * 50)
return exported_results