trending_deploy/main.py (46 lines of code) (raw):

import logging from pathlib import Path import json import pprint from trending_deploy.deploy import deploy_selected_models from trending_deploy.optimization import solve_knapsack from trending_deploy.constants import Model, DEFAULT_TASKS from trending_deploy.costs import get_cost_from_model from trending_deploy.models import trending_models from trending_deploy.rewards import get_reward_from_model class Trending(): def __init__(self, tasks: list[str] | None = None, max_models_per_task: int = 200, budget: int | None = 1000): """ Initializes the instance with the specified tasks, maximum models per task, and budget. Args: tasks (list[str] | None, optional): A list of task names. If None, defaults to DEFAULT_TASKS. Defaults to None. max_models_per_task (int, optional): The maximum number of models allowed per task. Defaults to 200. budget (int | None, optional): The budget for the tasks in monthly dollar spend. Defaults to 1000. """ if tasks is None: tasks = DEFAULT_TASKS self.tasks = tasks self.max_models_per_task = max_models_per_task self.budget = budget def __call__(self, budget: int | None = None, filename: str | Path | None = None, deploy_models: bool = True): # Step 1: Load the trending models as model candidates model_candidates: list[Model] = trending_models(tasks=self.tasks, max_models_per_task=self.max_models_per_task) # Step 2: Calculate the reward and cost for each model for model in model_candidates: model.reward = get_reward_from_model(model.model_info, task=model.model_info.pipeline_tag) model.cost = get_cost_from_model(model.model_info, task=model.model_info.pipeline_tag, instance=model.viable_instance) # Step 3: Solve the knapsack optimization problem given the model rewards, costs, and budget budget = budget or self.budget if budget is None: raise ValueError("Budget must be specified") max_reward, spent_budget, selected_models = solve_knapsack(model_candidates, budget) models_per_task = {task: 0 for task in self.tasks} for model in selected_models: models_per_task[model.model_info.pipeline_tag] += 1 logging.info(f"Selected models: {len(selected_models)} out of {len(model_candidates)} candidate models") logging.info(f"Expected spent budget: ${spent_budget:,} out of ${budget:,}") logging.info(f"Maximum reward reached: {max_reward}") logging.info(f"Models per task:\n{pprint.pformat(models_per_task)}") if filename is not None: with open(filename, "w") as f: json.dump([model.to_dict() for model in selected_models], f, indent=4, default=str) if deploy_models: # Step 4: Deploy the selected models results = deploy_selected_models(selected_models) logging.debug(f"Deployed models: {results['deployed_success']}") logging.warning(f"Failed to deploy models: {results['deployed_failed']}") logging.debug(f"Models already deployed: {results['undeployed_success']}") logging.warning(f"Failed to undeploy models: {results['undeployed_failed']}") return selected_models, max_reward, spent_budget if __name__ == "__main__": trending = Trending(tasks=DEFAULT_TASKS, max_models_per_task=300, budget=10_000) selected_models, max_reward, spent_budget = trending(filename="selected_models.json", deploy_models=False)