10_mlops/model.py (249 lines of code) (raw):
# Copyright 2017-2021 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import os, time
import hypertune
import numpy as np
import tensorflow as tf
BUCKET = None
TF_VERSION = '2-' + tf.__version__[2:3] # needed to choose container
DEVELOP_MODE = True
NUM_EXAMPLES = 5000 * 1000 # doesn't need to be precise but get order of magnitude right.
NUM_BUCKETS = 5
NUM_EMBEDS = 3
TRAIN_BATCH_SIZE = 64
DNN_HIDDEN_UNITS = '64,32'
CSV_COLUMNS = (
'ontime,dep_delay,taxi_out,distance,origin,dest,dep_hour,is_weekday,carrier,' +
'dep_airport_lat,dep_airport_lon,arr_airport_lat,arr_airport_lon,data_split'
).split(',')
CSV_COLUMN_TYPES = [
1.0, -3.0, 5.0, 1037.493622678299, 'OTH', 'DEN', 21, 1.0, 'OO',
43.41694444, -124.24694444, 39.86166667, -104.67305556, 'TRAIN'
]
def features_and_labels(features):
label = features.pop('ontime') # this is what we will train for
return features, label
def read_dataset(pattern, batch_size, mode=tf.estimator.ModeKeys.TRAIN, truncate=None):
dataset = tf.data.experimental.make_csv_dataset(
pattern, batch_size,
column_names=CSV_COLUMNS,
column_defaults=CSV_COLUMN_TYPES,
sloppy=True,
num_parallel_reads=2,
ignore_errors=True,
num_epochs=1)
dataset = dataset.map(features_and_labels)
if mode == tf.estimator.ModeKeys.TRAIN:
dataset = dataset.shuffle(batch_size * 10)
dataset = dataset.repeat()
dataset = dataset.prefetch(1)
if truncate is not None:
dataset = dataset.take(truncate)
return dataset
def create_model():
real = {
colname: tf.feature_column.numeric_column(colname)
for colname in
(
'dep_delay,taxi_out,distance,dep_hour,is_weekday,' +
'dep_airport_lat,dep_airport_lon,' +
'arr_airport_lat,arr_airport_lon'
).split(',')
}
sparse = {
'carrier': tf.feature_column.categorical_column_with_vocabulary_list('carrier',
vocabulary_list='AS,VX,F9,UA,US,WN,HA,EV,MQ,DL,OO,B6,NK,AA'.split(
',')),
'origin': tf.feature_column.categorical_column_with_hash_bucket('origin', hash_bucket_size=1000),
'dest': tf.feature_column.categorical_column_with_hash_bucket('dest', hash_bucket_size=1000),
}
inputs = {
colname: tf.keras.layers.Input(name=colname, shape=(), dtype='float32')
for colname in real.keys()
}
inputs.update({
colname: tf.keras.layers.Input(name=colname, shape=(), dtype='string')
for colname in sparse.keys()
})
latbuckets = np.linspace(20.0, 50.0, NUM_BUCKETS).tolist() # USA
lonbuckets = np.linspace(-120.0, -70.0, NUM_BUCKETS).tolist() # USA
disc = {}
disc.update({
'd_{}'.format(key): tf.feature_column.bucketized_column(real[key], latbuckets)
for key in ['dep_airport_lat', 'arr_airport_lat']
})
disc.update({
'd_{}'.format(key): tf.feature_column.bucketized_column(real[key], lonbuckets)
for key in ['dep_airport_lon', 'arr_airport_lon']
})
# cross columns that make sense in combination
sparse['dep_loc'] = tf.feature_column.crossed_column(
[disc['d_dep_airport_lat'], disc['d_dep_airport_lon']], NUM_BUCKETS * NUM_BUCKETS)
sparse['arr_loc'] = tf.feature_column.crossed_column(
[disc['d_arr_airport_lat'], disc['d_arr_airport_lon']], NUM_BUCKETS * NUM_BUCKETS)
sparse['dep_arr'] = tf.feature_column.crossed_column([sparse['dep_loc'], sparse['arr_loc']], NUM_BUCKETS ** 4)
# embed all the sparse columns
embed = {
'embed_{}'.format(colname): tf.feature_column.embedding_column(col, NUM_EMBEDS)
for colname, col in sparse.items()
}
real.update(embed)
# one-hot encode the sparse columns
sparse = {
colname: tf.feature_column.indicator_column(col)
for colname, col in sparse.items()
}
model = wide_and_deep_classifier(
inputs,
linear_feature_columns=sparse.values(),
dnn_feature_columns=real.values(),
dnn_hidden_units=DNN_HIDDEN_UNITS)
return model
def wide_and_deep_classifier(inputs, linear_feature_columns, dnn_feature_columns, dnn_hidden_units):
deep = tf.keras.layers.DenseFeatures(dnn_feature_columns, name='deep_inputs')(inputs)
layers = [int(x) for x in dnn_hidden_units.split(',')]
for layerno, numnodes in enumerate(layers):
deep = tf.keras.layers.Dense(numnodes, activation='relu', name='dnn_{}'.format(layerno + 1))(deep)
wide = tf.keras.layers.DenseFeatures(linear_feature_columns, name='wide_inputs')(inputs)
both = tf.keras.layers.concatenate([deep, wide], name='both')
output = tf.keras.layers.Dense(1, activation='sigmoid', name='pred')(both)
model = tf.keras.Model(inputs, output)
model.compile(optimizer='adam',
loss='binary_crossentropy',
metrics=['accuracy', rmse, tf.keras.metrics.AUC()])
return model
def rmse(y_true, y_pred):
return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true)))
def train_and_evaluate(train_data_pattern, eval_data_pattern, test_data_pattern, export_dir, output_dir):
train_batch_size = TRAIN_BATCH_SIZE
if DEVELOP_MODE:
eval_batch_size = 100
steps_per_epoch = 3
epochs = 2
num_eval_examples = eval_batch_size * 10
else:
eval_batch_size = 100
steps_per_epoch = NUM_EXAMPLES // train_batch_size
epochs = NUM_EPOCHS
num_eval_examples = eval_batch_size * 100
train_dataset = read_dataset(train_data_pattern, train_batch_size)
eval_dataset = read_dataset(eval_data_pattern, eval_batch_size, tf.estimator.ModeKeys.EVAL, num_eval_examples)
# checkpoint
checkpoint_path = '{}/checkpoints/flights.cpt'.format(output_dir)
logging.info("Checkpointing to {}".format(checkpoint_path))
cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_path,
save_weights_only=True,
verbose=1)
# call back to write out hyperparameter tuning metric
METRIC = 'val_rmse'
hpt = hypertune.HyperTune()
class HpCallback(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
if logs and METRIC in logs:
logging.info("Epoch {}: {} = {}".format(epoch, METRIC, logs[METRIC]))
hpt.report_hyperparameter_tuning_metric(hyperparameter_metric_tag=METRIC,
metric_value=logs[METRIC],
global_step=epoch)
# train the model
model = create_model()
logging.info(f"Training on {train_data_pattern}; eval on {eval_data_pattern}; {epochs} epochs; {steps_per_epoch}")
history = model.fit(train_dataset,
validation_data=eval_dataset,
epochs=epochs,
steps_per_epoch=steps_per_epoch,
callbacks=[cp_callback, HpCallback()])
# export
logging.info('Exporting to {}'.format(export_dir))
tf.saved_model.save(model, export_dir)
# write out final metric
final_rmse = history.history[METRIC][-1]
logging.info("Validation metric {} on {} samples = {}".format(METRIC, num_eval_examples, final_rmse))
if (not DEVELOP_MODE) and (test_data_pattern is not None) and (not SKIP_FULL_EVAL):
logging.info("Evaluating over full test dataset")
test_dataset = read_dataset(test_data_pattern, eval_batch_size, tf.estimator.ModeKeys.EVAL, None)
final_metrics = model.evaluate(test_dataset)
logging.info("Final metrics on full test dataset = {}".format(final_metrics))
else:
logging.info("Skipping evaluation on full test dataset")
if __name__ == '__main__':
logging.info("Tensorflow version " + tf.__version__)
parser = argparse.ArgumentParser()
parser.add_argument(
'--bucket',
help='Data will be read from gs://BUCKET/ch9/data and output will be in gs://BUCKET/ch9/trained_model',
required=True
)
parser.add_argument(
'--num_examples',
help='Number of examples per epoch. Get order of magnitude correct.',
type=int,
default=5000000
)
# for hyper-parameter tuning
parser.add_argument(
'--train_batch_size',
help='Number of examples to compute gradient on',
type=int,
default=256 # originally 64
)
parser.add_argument(
'--nbuckets',
help='Number of bins into which to discretize lats and lons',
type=int,
default=10 # originally 5
)
parser.add_argument(
'--nembeds',
help='Embedding dimension for categorical variables',
type=int,
default=3
)
parser.add_argument(
'--num_epochs',
help='Number of epochs (used only if --develop is not set)',
type=int,
default=10
)
parser.add_argument(
'--dnn_hidden_units',
help='Architecture of DNN part of wide-and-deep network',
default='64,64,64,8' # originally '64,32'
)
parser.add_argument(
'--develop',
help='Train on a small subset in development',
dest='develop',
action='store_true')
parser.set_defaults(develop=False)
parser.add_argument(
'--skip_full_eval',
help='Just train. Do not evaluate on test dataset.',
dest='skip_full_eval',
action='store_true')
parser.set_defaults(skip_full_eval=False)
# parse args
args = parser.parse_args().__dict__
logging.getLogger().setLevel(logging.INFO)
# The Vertex AI contract. If not running in Vertex AI Training, these will be None
OUTPUT_MODEL_DIR = os.getenv("AIP_MODEL_DIR") # or None
TRAIN_DATA_PATTERN = os.getenv("AIP_TRAINING_DATA_URI")
EVAL_DATA_PATTERN = os.getenv("AIP_VALIDATION_DATA_URI")
TEST_DATA_PATTERN = os.getenv("AIP_TEST_DATA_URI")
# set top-level output directory for checkpoints, etc.
BUCKET = args['bucket']
OUTPUT_DIR = 'gs://{}/ch9/train_output'.format(BUCKET)
# During hyperparameter tuning, we need to make sure different trials don't clobber each other
# https://cloud.google.com/ai-platform/training/docs/distributed-training-details#tf-config-format
# This doesn't exist in Vertex AI
# OUTPUT_DIR = os.path.join(
# OUTPUT_DIR,
# json.loads(
# os.environ.get('TF_CONFIG', '{}')
# ).get('task', {}).get('trial', '')
# )
if OUTPUT_MODEL_DIR:
# convert gs://ai-analytics-solutions-dsongcp2/aiplatform-custom-job-2021-11-13-22:22:46.175/1/model/
# to gs://ai-analytics-solutions-dsongcp2/aiplatform-custom-job-2021-11-13-22:22:46.175/1
OUTPUT_DIR = os.path.join(
os.path.dirname(OUTPUT_MODEL_DIR if OUTPUT_MODEL_DIR[-1] != '/' else OUTPUT_MODEL_DIR[:-1]),
'train_output')
logging.info('Writing checkpoints and other outputs to {}'.format(OUTPUT_DIR))
# Set default values for the contract variables in case we are not running in Vertex AI Training
if not OUTPUT_MODEL_DIR:
OUTPUT_MODEL_DIR = os.path.join(OUTPUT_DIR,
'export/flights_{}'.format(time.strftime("%Y%m%d-%H%M%S")))
if not TRAIN_DATA_PATTERN:
TRAIN_DATA_PATTERN = 'gs://{}/ch9/data/train*'.format(BUCKET)
CSV_COLUMNS.pop() # the data_split column won't exist
CSV_COLUMN_TYPES.pop() # the data_split column won't exist
if not EVAL_DATA_PATTERN:
EVAL_DATA_PATTERN = 'gs://{}/ch9/data/eval*'.format(BUCKET)
logging.info('Exporting trained model to {}'.format(OUTPUT_MODEL_DIR))
logging.info("Reading training data from {}".format(TRAIN_DATA_PATTERN))
logging.info('Writing trained model to {}'.format(OUTPUT_MODEL_DIR))
# other global parameters
NUM_BUCKETS = args['nbuckets']
NUM_EMBEDS = args['nembeds']
NUM_EXAMPLES = args['num_examples']
NUM_EPOCHS = args['num_epochs']
TRAIN_BATCH_SIZE = args['train_batch_size']
DNN_HIDDEN_UNITS = args['dnn_hidden_units']
DEVELOP_MODE = args['develop']
SKIP_FULL_EVAL = args['skip_full_eval']
# run
train_and_evaluate(TRAIN_DATA_PATTERN, EVAL_DATA_PATTERN, TEST_DATA_PATTERN, OUTPUT_MODEL_DIR, OUTPUT_DIR)
logging.info("Done")