trending_deploy/deploy.py (172 lines of code) (raw):
from typing import List, Literal, Dict, Any
import os
from huggingface_hub import (
InferenceClient,
create_inference_endpoint,
get_inference_endpoint,
get_collection,
list_inference_endpoints,
model_info as get_model_info,
add_collection_item,
delete_collection_item,
)
from trending_deploy.constants import Model, MEMORY_USAGE_TO_INSTANCE, Instance, INSTANCES
from trending_deploy.models import get_num_parameters_from_model, get_viable_instance_from_num_parameters
# Configuration
HF_TOKEN = os.environ.get("HF_TOKEN")
VENDOR = "aws"
REGION = "us-east-1"
TYPE = "protected"
NAMESPACE = "hf-inference"
ENDPOINT_PREFIX = "auto-"
COLLECTION_SLUG = "hf-inference/deployed-models-680a42b770e6b6cd546c3fbc"
DEFAULT_INSTANCE_TYPE = "intel-spr-overcommitted"
DEFAULT_INSTANCE_SIZE = "x16"
IMAGE = "registry.internal.huggingface.tech/hf-endpoints/inference-pytorch-cpu:api-inference-6.3.1"
# Instance size mapping based on instance memory
# Maps instance memory to HF instance size (x1, x2, etc.)
# Ensure INSTANCES are sorted by memory for reliable indexing later
SORTED_INSTANCES = sorted(INSTANCES, key=lambda x: x.memory_usage_bytes)
INSTANCE_SIZE_MAPPING = {
instance.memory_usage_bytes: f"x{2**(i)}"
for i, instance in enumerate(SORTED_INSTANCES)
}
def load_deployed_models() -> List[str]:
"""
Load the list of models that are already deployed.
"""
try:
endpoints = list_inference_endpoints(namespace=NAMESPACE)
# Extract model names from endpoints starting with our prefix
deployed_models = []
for endpoint in endpoints:
if endpoint.name.startswith(ENDPOINT_PREFIX):
# Extract the model name from the repository field
deployed_models.append(endpoint.repository)
return deployed_models
except Exception as e:
print(f"Error loading deployed models: {e}")
return []
def deploy_model(model: Model) -> bool:
"""
Deploy the specified model.
Args:
model (Model): The Model object containing model_info and viable_instance.
Returns:
bool: True if the model was successfully deployed, False otherwise.
"""
try:
model_name = model.model_info.id
endpoint_name = f"{ENDPOINT_PREFIX}{model_name.split('/')[-1].replace('.', '-').replace('_', '-')}"[:31].lower()
# Get task from model info
task = model.model_info.pipeline_tag
# Determine instance size
initial_memory = model.viable_instance.memory_usage_bytes
instance_size = INSTANCE_SIZE_MAPPING.get(initial_memory, "x1") # Default to x1
instance_size = DEFAULT_INSTANCE_SIZE
# Increase instance size by one notch for text-embeddings-inference
# With custom images for embedding models, we might not need this anymore
if "text-embeddings-inference" in model.model_info.tags:
# instance_size = increase_instance_size(model, instance_size, initial_memory)
instance_size = DEFAULT_INSTANCE_SIZE
endpoint_kwargs = {
"name": endpoint_name,
"namespace": NAMESPACE,
"repository": model_name,
"framework": "pytorch",
"task": task,
"accelerator": "cpu",
"vendor": VENDOR,
"region": REGION,
"type": TYPE,
"instance_size": instance_size, # Use the potentially upgraded size
"instance_type": DEFAULT_INSTANCE_TYPE,
"min_replica": 1,
"scale_to_zero_timeout": None,
"domain": "api-inference.endpoints.huggingface.tech",
"path": f"/models/{model_name}",
"tags": ["auto", "api-inference"]
}
# Override task
if task == "feature-extraction" and (
any(x in model.model_info.tags for x in ["sentence-transformers", "sentence transformers"])
or model.model_info.library_name == "sentence-transformers"
):
task = "sentence-embeddings"
endpoint_kwargs["custom_image"] = {
"health_route": "/health",
"port": 5000,
"url": IMAGE
}
endpoint_kwargs["env"] = {
"API_INFERENCE_COMPAT": "true",
"HF_MODEL_DIR": "/repository",
"HF_TASK": task,
"UNLOAD_IDLE": "true",
"IDLE_TIMEOUT": "60"
}
endpoint_kwargs["task"] = task
print(f"Creating endpoint {endpoint_name} for model {model_name} with instance size {instance_size}...")
endpoint = create_inference_endpoint(**endpoint_kwargs)
print(f"Waiting for endpoint {endpoint_name} to be ready...")
# Wait for deployment (with timeout to avoid blocking indefinitely)
endpoint.wait(timeout=300)
print(f"Endpoint {endpoint_name} for model {model_name} deployed successfully.")
add_collection_item(COLLECTION_SLUG, item_id=model_name, item_type="model")
return True
except Exception as e:
print(f"Error deploying model {model.model_info.id}: {e}")
return False
def increase_instance_size(model: Model, instance_size, initial_memory) -> bool:
model_name = model.model_info.id
current_index = -1
for i, instance in enumerate(SORTED_INSTANCES):
if instance.memory_usage_bytes == initial_memory:
current_index = i
break
if current_index != -1 and current_index + 1 < len(SORTED_INSTANCES):
next_memory = SORTED_INSTANCES[current_index + 1].memory_usage_bytes
upgraded_size = INSTANCE_SIZE_MAPPING.get(next_memory)
if upgraded_size:
print(f"Upgrading instance size for TEI model {model_name} from {instance_size} to {upgraded_size}")
instance_size = upgraded_size
else:
print(f"Warning: Could not find mapping for next instance size ({next_memory} bytes) for TEI model {model_name}. Using {instance_size}.")
elif current_index != -1:
print(f"Warning: TEI model {model_name} is already on the largest instance size ({instance_size}). Cannot upgrade further.")
return instance_size
def undeploy_model(model_name: str) -> bool:
"""
Undeploy the specified model.
Args:
model_name (str): The name of the model to undeploy.
Returns:
bool: True if the model was successfully undeployed, False otherwise.
"""
try:
# Find the endpoint for this model
endpoints = list_inference_endpoints(namespace=NAMESPACE)
endpoint_found = False
for endpoint in endpoints:
if endpoint.repository == model_name and endpoint.name.startswith(ENDPOINT_PREFIX):
print(f"Deleting endpoint {endpoint.name} for model {model_name}...")
endpoint.delete()
endpoint_found = True
print(f"Endpoint {endpoint.name} deleted successfully.")
break # Assuming only one endpoint per model
if not endpoint_found:
print(f"Warning: Endpoint for model {model_name} not found. Cannot undeploy.")
# Decide if this should be considered a failure or success if no endpoint existed
return True # Or False, depending on desired behavior
# Find and delete the corresponding item in the collection
try:
collection = get_collection(COLLECTION_SLUG)
item_object_id_to_delete = None
for item in collection.items:
# item_id is the model repo id (e.g., 'bert-base-uncased')
# _id is the collection item's internal object id
if item.item_type == "model" and item.item_id == model_name:
item_object_id_to_delete = item.item_object_id
break
if item_object_id_to_delete:
print(f"Deleting item for {model_name} (ID: {item_object_id_to_delete}) from collection {COLLECTION_SLUG}...")
delete_collection_item(COLLECTION_SLUG, item_object_id=item_object_id_to_delete)
print(f"Collection item for {model_name} deleted successfully.")
else:
print(f"Warning: Could not find item for model {model_name} in collection {COLLECTION_SLUG}.")
except Exception as e_coll:
print(f"Error managing collection item for model {model_name}: {e_coll}")
return True # Return True as endpoint deletion was the primary goal
except Exception as e:
print(f"Error during undeployment process for model {model_name}: {e}")
return False
def deploy_selected_models(models: List[Model]) -> dict[Literal["deployed_success", "deployed_failed", "undeployed_success", "undeployed_failed"], List[str]]:
"""
Deploy the selected models.
Args:
models (list[Model]): A list of selected models to deploy.
Returns:
dict: A dictionary containing lists of successfully and unsuccessfully deployed and undeployed models.
"""
to_deploy_models = {model.model_info.id: model for model in models}
deployed_model_names = set(load_deployed_models())
deployed_success = []
deployed_failed = []
for model_id in set(to_deploy_models.keys()) - deployed_model_names:
success = deploy_model(to_deploy_models[model_id])
if success:
deployed_success.append(model_id)
else:
deployed_failed.append(model_id)
undeployed_success = []
undeployed_failed = []
for model_to_undeploy in deployed_model_names - set(to_deploy_models.keys()):
success = undeploy_model(model_to_undeploy)
if success:
undeployed_success.append(model_to_undeploy)
else:
undeployed_failed.append(model_to_undeploy)
return {
"deployed_success": deployed_success,
"deployed_failed": deployed_failed,
"undeployed_success": undeployed_success,
"undeployed_failed": undeployed_failed,
}