10_mlops/train_on_vertexai.py (252 lines of code) (raw):

# Copyright 2017-2021 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import logging from datetime import datetime import tensorflow as tf from google.cloud import aiplatform from google.cloud.aiplatform import gapic as aip from google.cloud.aiplatform import hyperparameter_tuning as hpt from kfp.v2 import compiler, dsl ENDPOINT_NAME = 'flights' def train_custom_model(data_set, timestamp, develop_mode, cpu_only_mode, tf_version, extra_args=None): # Set up training and deployment infra if cpu_only_mode: train_image='us-docker.pkg.dev/vertex-ai/training/tf-cpu.{}:latest'.format(tf_version) deploy_image='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.{}:latest'.format(tf_version) else: train_image = "us-docker.pkg.dev/vertex-ai/training/tf-gpu.{}:latest".format(tf_version) deploy_image = "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.{}:latest".format(tf_version) # train model_display_name = '{}-{}'.format(ENDPOINT_NAME, timestamp) job = aiplatform.CustomTrainingJob( display_name='train-{}'.format(model_display_name), script_path="model.py", container_uri=train_image, requirements=['cloudml-hypertune'], # any extra Python packages model_serving_container_image_uri=deploy_image ) model_args = [ '--bucket', BUCKET, ] if develop_mode: model_args += ['--develop'] if extra_args: model_args += extra_args if cpu_only_mode: model = job.run( dataset=data_set, # See https://googleapis.dev/python/aiplatform/latest/aiplatform.html# predefined_split_column_name='data_split', model_display_name=model_display_name, args=model_args, replica_count=1, machine_type='n1-standard-4', sync=develop_mode ) else: model = job.run( dataset=data_set, # See https://googleapis.dev/python/aiplatform/latest/aiplatform.html# predefined_split_column_name='data_split', model_display_name=model_display_name, args=model_args, replica_count=1, machine_type='n1-standard-4', # See https://cloud.google.com/vertex-ai/docs/general/locations#accelerators accelerator_type=aip.AcceleratorType.NVIDIA_TESLA_T4.name, accelerator_count=1, sync=develop_mode ) return model def train_automl_model(data_set, timestamp, develop_mode): # train model_display_name = '{}-{}'.format(ENDPOINT_NAME, timestamp) job = aiplatform.AutoMLTabularTrainingJob( display_name='train-{}'.format(model_display_name), optimization_prediction_type='classification' ) model = job.run( dataset=data_set, # See https://googleapis.dev/python/aiplatform/latest/aiplatform.html# predefined_split_column_name='data_split', target_column='ontime', model_display_name=model_display_name, budget_milli_node_hours=(300 if develop_mode else 2000), disable_early_stopping=False, export_evaluated_data_items=True, export_evaluated_data_items_bigquery_destination_uri='{}:dsongcp.ch9_automl_evaluated'.format(PROJECT), export_evaluated_data_items_override_destination=True, sync=develop_mode ) return model def do_hyperparameter_tuning(data_set, timestamp, develop_mode, cpu_only_mode, tf_version): # Vertex AI services require regional API endpoints. if cpu_only_mode: train_image='us-docker.pkg.dev/vertex-ai/training/tf-cpu.{}:latest'.format(tf_version) else: train_image = "us-docker.pkg.dev/vertex-ai/training/tf-gpu.{}:latest".format(tf_version) # a single trial job model_display_name = '{}-{}'.format(ENDPOINT_NAME, timestamp) if cpu_only_mode: trial_job = aiplatform.CustomJob.from_local_script( display_name='train-{}'.format(model_display_name), script_path="model.py", container_uri=train_image, args=[ '--bucket', BUCKET, '--skip_full_eval', # no need to evaluate on test data set '--num_epochs', '10', '--num_examples', '500000' # 1/10 actual size to finish faster ], requirements=['cloudml-hypertune'], # any extra Python packages replica_count=1, machine_type='n1-standard-4' ) else: trial_job = aiplatform.CustomJob.from_local_script( display_name='train-{}'.format(model_display_name), script_path="model.py", container_uri=train_image, args=[ '--bucket', BUCKET, '--skip_full_eval', # no need to evaluate on test data set '--num_epochs', '10', '--num_examples', '500000' # 1/10 actual size to finish faster ], requirements=['cloudml-hypertune'], # any extra Python packages replica_count=1, machine_type='n1-standard-4', # See https://cloud.google.com/vertex-ai/docs/general/locations#accelerators accelerator_type=aip.AcceleratorType.NVIDIA_TESLA_T4.name, accelerator_count=1, ) # the tuning job hparam_job = aiplatform.HyperparameterTuningJob( # See https://googleapis.dev/python/aiplatform/latest/aiplatform.html# display_name='hparam-{}'.format(model_display_name), custom_job=trial_job, metric_spec={'val_rmse': 'minimize'}, parameter_spec={ "train_batch_size": hpt.IntegerParameterSpec(min=16, max=256, scale='log'), "nbuckets": hpt.IntegerParameterSpec(min=5, max=10, scale='linear'), "dnn_hidden_units": hpt.CategoricalParameterSpec(values=["64,16", "64,16,4", "64,64,64,8", "256,64,16"]) }, max_trial_count=2 if develop_mode else NUM_HPARAM_TRIALS, parallel_trial_count=2, search_algorithm=None, # Bayesian ) hparam_job.run(sync=True) # has to finish before we can get trials. # get the parameters corresponding to the best trial best = sorted(hparam_job.trials, key=lambda x: x.final_measurement.metrics[0].value)[0] logging.info('Best trial: {}'.format(best)) best_params = [] for param in best.parameters: best_params.append('--{}'.format(param.parameter_id)) if param.parameter_id in ["train_batch_size", "nbuckets"]: # hparam returns 10.0 even though it's an integer param. so round it. # but CustomTrainingJob makes integer args into floats. so make it a string best_params.append(str(int(round(param.value)))) else: # string or float parameters best_params.append(param.value) # run the best trial to completion logging.info('Launching full training job with {}'.format(best_params)) return train_custom_model(data_set, timestamp, develop_mode, cpu_only_mode, tf_version, extra_args=best_params) @dsl.pipeline(name="flights-ch9-pipeline", description="ds-on-gcp ch9 flights pipeline" ) def main(): aiplatform.init(project=PROJECT, location=REGION, staging_bucket='gs://{}'.format(BUCKET)) # create data set all_files = tf.io.gfile.glob('gs://{}/ch9/data/all*.csv'.format(BUCKET)) logging.info("Training on {}".format(all_files)) data_set = aiplatform.TabularDataset.create( display_name='data-{}'.format(ENDPOINT_NAME), gcs_source=all_files ) if TF_VERSION is not None: tf_version = TF_VERSION.replace(".", "-") else: tf_version = '2-' + tf.__version__[2:3] # train if AUTOML: model = train_automl_model(data_set, TIMESTAMP, DEVELOP_MODE) elif NUM_HPARAM_TRIALS > 1: model = do_hyperparameter_tuning(data_set, TIMESTAMP, DEVELOP_MODE, CPU_ONLY_MODE, tf_version) else: model = train_custom_model(data_set, TIMESTAMP, DEVELOP_MODE, CPU_ONLY_MODE, tf_version) # create endpoint if it doesn't already exist endpoints = aiplatform.Endpoint.list( filter='display_name="{}"'.format(ENDPOINT_NAME), order_by='create_time desc', project=PROJECT, location=REGION, ) if len(endpoints) > 0: endpoint = endpoints[0] # most recently created else: endpoint = aiplatform.Endpoint.create( display_name=ENDPOINT_NAME, project=PROJECT, location=REGION, sync=DEVELOP_MODE ) # deploy model.deploy( endpoint=endpoint, traffic_split={"0": 100}, machine_type='n1-standard-2', min_replica_count=1, max_replica_count=1, sync=DEVELOP_MODE ) if DEVELOP_MODE: model.wait() def run_pipeline(): compiler.Compiler().compile(pipeline_func=main, package_path='flights_pipeline.json') job = aip.PipelineJob( display_name="{}-pipeline".format(ENDPOINT_NAME), template_path="{}_pipeline.json".format(ENDPOINT_NAME), pipeline_root="{}/pipeline_root/intro".format(BUCKET), enable_caching=False ) job.run() if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument( '--bucket', help='Data will be read from gs://BUCKET/ch9/data and checkpoints will be in gs://BUCKET/ch9/trained_model', required=True ) parser.add_argument( '--region', help='Where to run the trainer', default='us-central1' ) parser.add_argument( '--project', help='Project to be billed', required=True ) parser.add_argument( '--develop', help='Train on a small subset in development', dest='develop', action='store_true') parser.set_defaults(develop=False) parser.add_argument( '--automl', help='Train an AutoML Table, instead of using model.py', dest='automl', action='store_true') parser.set_defaults(automl=False) parser.add_argument( '--num_hparam_trials', help='Number of hyperparameter trials. 0/1 means no hyperparam. Ignored if --automl is set.', type=int, default=0) parser.add_argument( '--pipeline', help='Run as pipeline', dest='pipeline', action='store_true') parser.add_argument( '--cpuonly', help='Run without GPU', dest='cpuonly', action='store_true') parser.set_defaults(cpuonly=False) parser.add_argument( '--tfversion', help='TensorFlow version to use' ) # parse args logging.getLogger().setLevel(logging.INFO) args = parser.parse_args().__dict__ BUCKET = args['bucket'] PROJECT = args['project'] REGION = args['region'] DEVELOP_MODE = args['develop'] CPU_ONLY_MODE = args['cpuonly'] TF_VERSION = args['tfversion'] AUTOML = args['automl'] NUM_HPARAM_TRIALS = args['num_hparam_trials'] TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S") if args['pipeline']: run_pipeline() else: main()