tutorials/tensorflow/mlflow_gcp/trainer/task.py (229 lines of code) (raw):
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Trains a Keras model to predict income bracket from other Census data."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import logging
import tempfile
import os
import shutil
from builtins import int
from mlflow import pyfunc
from tensorflow.python.saved_model import tag_constants
from time import time
from . import model
from . import utils
from . import model_deployment
import mlflow
import mlflow.tensorflow
import tensorflow as tf
# mlflow.tensorflow.autolog()
def get_args():
"""Argument parser.
Returns:
Dictionary of arguments.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'--train-files',
help='GCS file or local paths to training data',
default='gs://cloud-samples-data/ml-engine/census/data/adult.data.csv')
parser.add_argument(
'--eval-files',
help='GCS file or local paths to evaluation data',
default='gs://cloud-samples-data/ml-engine/census/data/adult.test.csv')
parser.add_argument(
'--job-dir',
type=str,
required=True,
help='Local or GCS location for writing checkpoints and exporting '
'models')
parser.add_argument(
'--num-epochs',
type=int,
default=20,
help='Number of times to go through the data, default=20')
parser.add_argument(
'--batch-size',
default=64,
type=int,
help='Number of records to read during each training step, default=128')
parser.add_argument(
'--learning-rate',
default=.01,
type=float,
help='Learning rate for gradient descent, default=.01')
parser.add_argument(
'--eval-steps',
help='Number of steps to run evaluation for, at each checkpoint',
default=1,
type=int)
parser.add_argument(
'--reuse-job-dir',
action='store_true',
default=False,
help="""
Flag to decide if the model checkpoint should be re-used from the
job-dir.
If set to False then the job-dir will be deleted.
""")
parser.add_argument(
'--verbosity',
choices=['DEBUG', 'ERROR', 'FATAL', 'INFO', 'WARN'],
default='INFO')
parser.add_argument(
'--deploy-gcp',
action='store_true',
default=False,
help='Local or GCS location for writing checkpoints and exporting '
'models')
parser.add_argument(
'--model-reload',
action='store_true',
default=False,
help='Reload model using pyfunc')
parser.add_argument(
'--project-id',
type=str,
help='AI Platform project id')
parser.add_argument(
'--mlflow-tracking-uri',
type=str,
default='',
help='MLFlow tracking URI')
parser.add_argument(
'--gcs-bucket',
type=str,
default='mlflow_gcp',
help='AI Platform GCS bucket')
parser.add_argument(
'--model-name',
type=str,
default='mlflow',
help='AI Platform model')
parser.add_argument(
'--run-time-version',
type=str,
default='1.15',
help='AI Platform Run time version')
args, _ = parser.parse_known_args()
return args
def _mlflow_log_metrics(metrics, metric_name):
"""Record metric value during each epoch using the step parameter in
mlflow.log_metric.
:param metrics:
:param metric_name:
:return:
"""
for epoch, metric in enumerate(metrics[metric_name], 1): mlflow.log_metric(
metric_name, metric,
step=epoch)
def train_and_evaluate(args):
"""Trains and evaluates the Keras model.
Uses the Keras model defined in model.py and trains on data loaded and
preprocessed in utils.py. Saves the trained model in TensorFlow SavedModel
format to the path defined in part by the --job-dir argument.
History objects returns:
{'loss': [0.5699903990809373, 0.3629718415849791],
'acc': [0.78604823, 0.8331693],
'val_loss': [0.3966572880744934, 0.3477487564086914],
'val_acc': [0.8278044, 0.8281116],
'lr': [0.02, 0.015]}
Args:
args: dictionary of arguments - see get_args() for details
"""
logging.info('Resume training: {}'.format(args.reuse_job_dir))
if not args.reuse_job_dir:
if tf.io.gfile.exists(args.job_dir):
tf.io.gfile.rmtree(args.job_dir)
logging.info(
'Deleted job_dir {} to avoid re-use'.format(args.job_dir))
else:
logging.info('Reusing job_dir {} if it exists'.format(args.job_dir))
train_x, train_y, eval_x, eval_y = utils.load_data(args.train_files,
args.eval_files)
# dimensions
num_train_examples, input_dim = train_x.shape
num_eval_examples = eval_x.shape[0]
# Create the Keras Model
keras_model = model.create_keras_model(
input_dim=input_dim, learning_rate=args.learning_rate)
# Pass a numpy array by passing DataFrame.values
training_dataset = model.input_fn(
features=train_x.values,
labels=train_y,
shuffle=True,
num_epochs=args.num_epochs,
batch_size=args.batch_size)
# Pass a numpy array by passing DataFrame.values
validation_dataset = model.input_fn(
features=eval_x.values,
labels=eval_y,
shuffle=False,
num_epochs=args.num_epochs,
batch_size=num_eval_examples)
start_time = time()
# Set MLflow tracking URI
if args.mlflow_tracking_uri:
mlflow.set_tracking_uri(args.mlflow_tracking_uri)
# Train model
with mlflow.start_run() as active_run:
run_id = active_run.info.run_id
# Callbacks
class MlflowCallback(tf.keras.callbacks.Callback):
# This function will be called after training completes.
def on_train_end(self, logs=None):
mlflow.log_param('num_layers', len(self.model.layers))
mlflow.log_param('optimizer_name',
type(self.model.optimizer).__name__)
# MLflow callback
mlflow_callback = MlflowCallback()
# Setup Learning Rate decay callback.
lr_decay_callback = tf.keras.callbacks.LearningRateScheduler(
lambda epoch: args.learning_rate + 0.02 * (0.5 ** (1 + epoch)),
verbose=False)
# Setup TensorBoard callback.
tensorboard_path = os.path.join(args.job_dir, run_id, 'tensorboard')
tensorboard_callback = tf.keras.callbacks.TensorBoard(
tensorboard_path,
histogram_freq=1)
history = keras_model.fit(
training_dataset,
steps_per_epoch=int(num_train_examples / args.batch_size),
epochs=args.num_epochs,
validation_data=validation_dataset,
validation_steps=args.eval_steps,
verbose=1,
callbacks=[lr_decay_callback, tensorboard_callback,
mlflow_callback])
metrics = history.history
logging.info(metrics)
keras_model.summary()
mlflow.log_param('train_files', args.train_files)
mlflow.log_param('eval_files', args.eval_files)
mlflow.log_param('num_epochs', args.num_epochs)
mlflow.log_param('batch_size', args.batch_size)
mlflow.log_param('learning_rate', args.learning_rate)
mlflow.log_param('train_samples', num_train_examples)
mlflow.log_param('eval_samples', num_eval_examples)
mlflow.log_param('eval_steps', args.eval_steps)
mlflow.log_param('steps_per_epoch',
int(num_train_examples / args.batch_size))
# Add metrics
_mlflow_log_metrics(metrics, 'loss')
_mlflow_log_metrics(metrics, 'acc')
_mlflow_log_metrics(metrics, 'val_loss')
_mlflow_log_metrics(metrics, 'val_acc')
_mlflow_log_metrics(metrics, 'lr')
# Export SavedModel
model_local_path = os.path.join(args.job_dir, run_id, 'model')
tf.keras.experimental.export_saved_model(keras_model, model_local_path)
# Define artifacts.
logging.info('Model exported to: {}'.format(model_local_path))
# MLflow workaround since is unable to read GCS path.
# https://github.com/mlflow/mlflow/issues/1765
if model_local_path.startswith('gs://'):
logging.info('Creating temp folder')
temp = tempfile.mkdtemp()
model_deployment.copy_artifacts(model_local_path, temp)
model_local_path = os.path.join(temp, 'model')
if tensorboard_path.startswith('gs://'):
logging.info('Creating temp folder')
temp = tempfile.mkdtemp()
model_deployment.copy_artifacts(tensorboard_path, temp)
tensorboard_path = temp
mlflow.tensorflow.log_model(tf_saved_model_dir=model_local_path,
tf_meta_graph_tags=[tag_constants.SERVING],
tf_signature_def_key='serving_default',
artifact_path='model')
# Reloading the model
if args.model_reload:
mlflow.pyfunc.load_model(mlflow.get_artifact_uri('model'))
logging.info('Uploading TensorFlow events as a run artifact.')
mlflow.log_artifacts(tensorboard_path)
logging.info(
'Launch TensorBoard with:\n\ntensorboard --logdir=%s' %
tensorboard_path)
duration = time() - start_time
mlflow.log_metric('duration', duration)
mlflow.end_run()
if model_local_path.startswith('gs://') and tensorboard_path.startswith(
'gs://'):
shutil.rmtree(model_local_path)
shutil.rmtree(tensorboard_path)
# Deploy model to AI Platform.
if args.deploy_gcp:
# Create AI Platform helper instance.
if not args.project_id:
raise ValueError('No Project is defined')
if not args.gcs_bucket:
raise ValueError('No GCS bucket')
model_helper = model_deployment.AIPlatformModel(
project_id=args.project_id)
# Copy local model to GCS for deployment.
if not model_local_path.startswith('gs://'):
model_gcs_path = os.path.join('gs://', args.gcs_bucket, run_id,
'model')
model_deployment.copy_artifacts(model_local_path, model_gcs_path)
# Create model
model_helper.create_model(args.model_name)
# Create model version
model_helper.deploy_model(model_gcs_path, args.model_name, run_id,
args.run_time_version)
logging.info('Model deployment in GCP completed')
logging.info(
'This model took: {} seconds to train and test.'.format(duration))
if __name__ == '__main__':
args = get_args()
tf.compat.v1.logging.set_verbosity(args.verbosity)
train_and_evaluate(args)