vertex_ai/deploy_model/deploy_model.py (126 lines of code) (raw):

import argparse import os import sys import logging import json print("=========== model deployment =============") from google.cloud import aiplatform as vertex_ai print("=========== model deployment =============") SCRIPT_DIR = os.path.dirname( os.path.realpath(os.path.join(os.getcwd(), os.path.expanduser(__file__))) ) sys.path.append(os.path.normpath(os.path.join(SCRIPT_DIR, ".."))) SERVING_SPEC_FILEPATH = 'deploy_model/serving_resources_spec.json' def get_args(): parser = argparse.ArgumentParser() parser.add_argument( '--mode', type=str, ) parser.add_argument( '--project', type=str, ) parser.add_argument( '--region', type=str, ) parser.add_argument( '--endpoint-display-name', type=str, ) parser.add_argument( '--model-display-name', type=str, ) parser.add_argument( '--pipeline-name', type=str, ) parser.add_argument( '--pipelines-store', type=str, ) return parser.parse_args() def create_endpoint(project, region, endpoint_display_name): logging.info(f"Creating endpoint {endpoint_display_name}") vertex_ai.init( project=project, location=region ) endpoints = vertex_ai.Endpoint.list( filter=f'display_name={endpoint_display_name}', order_by="update_time") if len(endpoints) > 0: logging.info(f"Endpoint {endpoint_display_name} already exists.") endpoint = endpoints[-1] else: endpoint = vertex_ai.Endpoint.create(endpoint_display_name) logging.info(f"Endpoint is ready.") logging.info(endpoint.gca_resource) return endpoint def deploy_model(project, region, endpoint_display_name, model_display_name, serving_resources_spec): logging.info(f"Deploying model {model_display_name} to endpoint {endpoint_display_name}") vertex_ai.init( project=project, location=region ) model = vertex_ai.Model.list( filter=f'display_name={model_display_name}', order_by="update_time" )[-1] endpoint = vertex_ai.Endpoint.list( filter=f'display_name={endpoint_display_name}', order_by="update_time" )[-1] deployed_model = endpoint.deploy(model=model, **serving_resources_spec) logging.info(f"Model is deployed.") logging.info(deployed_model) return deployed_model def compile_pipeline(pipeline_name): from src.tfx_pipelines import runner pipeline_definition_file = f"{pipeline_name}.json" pipeline_definition = runner.compile_training_pipeline(pipeline_definition_file) return pipeline_definition def main(): args = get_args() if args.mode == 'create-endpoint': if not args.project: raise ValueError("project must be supplied.") if not args.region: raise ValueError("region must be supplied.") if not args.endpoint_display_name: raise ValueError("endpoint_display_name must be supplied.") result = create_endpoint( args.project, args.region, args.endpoint_display_name ) elif args.mode == 'deploy-model': if not args.project: raise ValueError("project must be supplied.") if not args.region: raise ValueError("region must be supplied.") if not args.endpoint_display_name: raise ValueError("endpoint-display-name must be supplied.") if not args.model_display_name: raise ValueError("model-display-name must be supplied.") with open(SERVING_SPEC_FILEPATH) as json_file: serving_resources_spec = json.load(json_file) logging.info(f"serving resources: {serving_resources_spec}") result = deploy_model( args.project, args.region, args.endpoint_display_name, args.model_display_name, serving_resources_spec ) elif args.mode == 'compile-pipeline': if not args.pipeline_name: raise ValueError("pipeline-name must be supplied.") result = compile_pipeline(args.pipeline_name) else: raise ValueError(f"Invalid mode {args.mode}.") logging.info(result) if __name__ == "__main__": main()