courses/machine_learning/asl/02_tensorflow/e_traineval.ipynb (481 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Introducing tf.estimator.train_and_evaluate()\n",
"**Learning Objectives**\n",
"- Introduce new type of input function (`serving_input_reciever_fn()`) which supports remote access to our model via REST API\n",
"- Use the `tf.estimator.train_and_evaluate()` method to periodically evaluate *during* training\n",
"- Practice using TensorBoard to visualize training and evaluation loss curves"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Introduction \n",
"\n",
"In this notebook, we'll see how to use the `train_and_evaluate` method within `tf.estimator` to train and evaluate our machin learning model. \n",
"\n",
"Run the following cell and reset the session if needed:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import tensorflow as tf\n",
"import shutil\n",
"print(tf.__version__)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train and Evaluate Input Functions\n",
"\n",
"We'll use the same train and evaluation input functions that we created before. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"CSV_COLUMN_NAMES = [\"fare_amount\",\"dayofweek\",\"hourofday\",\"pickuplon\",\"pickuplat\",\"dropofflon\",\"dropofflat\"]\n",
"CSV_DEFAULTS = [[0.0],[1],[0],[-74.0], [40.0], [-74.0], [40.7]]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def parse_row(row):\n",
" fields = tf.decode_csv(records = row, record_defaults = CSV_DEFAULTS)\n",
" features = dict(zip(CSV_COLUMN_NAMES, fields))\n",
" label = features.pop(\"fare_amount\")\n",
" return features, label\n",
"\n",
"def read_dataset(csv_path):\n",
" dataset = tf.data.TextLineDataset(filenames = csv_path).skip(count = 1) # skip header\n",
" dataset = dataset.map(map_func = parse_row)\n",
" return dataset\n",
"\n",
"def train_input_fn(csv_path, batch_size = 128):\n",
" dataset = read_dataset(csv_path)\n",
" dataset = dataset.shuffle(buffer_size = 1000).repeat(count = None).batch(batch_size = batch_size)\n",
" return dataset\n",
"\n",
"def eval_input_fn(csv_path, batch_size = 128):\n",
" dataset = read_dataset(csv_path)\n",
" dataset = dataset.batch(batch_size = batch_size)\n",
" return dataset"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Feature Columns\n",
"\n",
"We also create the feature columns for the model the same as before."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"FEATURE_NAMES = CSV_COLUMN_NAMES[1:] # all but first column\n",
"\n",
"feature_cols = [tf.feature_column.numeric_column(key = k) for k in FEATURE_NAMES]\n",
"print(feature_cols)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Serving Input Receiver Function \n",
"\n",
"In a prior notebook we used the `estimator.predict()` function to get taxifare predictions. This worked fine because we had done our model training on the same machine. \n",
"\n",
"However in a production setting this won't usually be the case. Our clients may be remote web servers, mobile apps and more. Instead of having to ship our model files to every client, it would be better to host our model in one place, and make it remotely accesible for prediction requests using a REST API.\n",
"\n",
"The TensorFlow solution for this is a project called [TF Serving](https://www.tensorflow.org/serving/), which is part of the larger [Tensorflow Extended (TFX)](https://www.tensorflow.org/tfx/) platform that extends TensorFlow for production environments. \n",
"\n",
"The interface between TensorFlow and TF Serving is a `serving_input_receiver_fn()`. It has two jobs:\n",
"- To add `tf.placeholder`s to the graph to specify what type of tensors TF Serving should recieve during inference requests. The placeholders are specified as a dictionary object\n",
"- To add any additional ops needed to convert data from the client into the tensors expected by the model.\n",
"\n",
"The function must return a `tf.estimator.export.ServingInputReceiver` object, which packages the placeholders and the neccesary transformations together."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In the cell below, we implement the `serving_input_receiver_fn` function that returns an instance of\n",
"`tf.estimator.export.ServingInputReceiver(features, receiver_tensors)`. Have a look at [the documentation for Tensorflow's ServingInputReceiver](https://www.tensorflow.org/api_docs/python/tf/estimator/export/ServingInputReceiver). Here, `receiver_tensors` is a dictionary describing the JSON object received by the Cloud ML Engine API, and is a dictionary `features` that has the structure as the feature dictionary accepted by our estimator.\n",
"\n",
"We keep things simple by assuming that the API receives a JSON object that has already the correct structure\n",
"(i.e. `features = receiver_tensors`)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def serving_input_receiver_fn():\n",
" receiver_tensors = {\n",
" 'dayofweek' : tf.placeholder(dtype = tf.int32, shape = [None]),\n",
" 'hourofday' : tf.placeholder(dtype = tf.int32, shape = [None]),\n",
" 'pickuplon' : tf.placeholder(dtype = tf.float32, shape = [None]), \n",
" 'pickuplat' : tf.placeholder(dtype = tf.float32, shape = [None]),\n",
" 'dropofflat' : tf.placeholder(dtype = tf.float32, shape = [None]),\n",
" 'dropofflon' : tf.placeholder(dtype = tf.float32, shape = [None]),\n",
" }\n",
" \n",
" features = receiver_tensors \n",
" return tf.estimator.export.ServingInputReceiver(features = features, receiver_tensors = receiver_tensors)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Train and Evaluate\n",
"\n",
"One issue with the previous notebooks is we only evaluate on our validation data once training is complete. This means we can't tell at what point overfitting began. What we really want is to evaluate at specified intervals *during* the training phase.\n",
"\n",
"The Estimator API way of doing this is to replace `estimator.train()` and `estimator.evaluate()` with `estimator.train_and_evaluate()`. This causes an evaluation to be done after every training checkpoint. However by default Tensorflow only checkpoints once every 10 minutes. Since this is less than the length of our total training we'd end up with the same behavior as before which is just one evaluation at the end of training. \n",
"\n",
"To remedy this we speciy in the `tf.estimator.RunConfig()` that TensorFlow should checkpoint every 100 steps.\n",
"\n",
"The default evaluation metric `average_loss` is MSE, but we want RMSE. Previously we just took the square root of the final `average_loss`. However it would be better if we could calculate RMSE not just at the end, but for every intermediate checkpoint and plot the change over time in TensorBoard. [`tf.contrib.estimator.add_metrics()`](https://www.tensorflow.org/api_docs/python/tf/contrib/estimator/add_metrics) allows us to do this. We wrap our estimator with it, and provide a custom evaluation function.\n",
"\n",
"`train_and_evaluate()` also allows us to use our `serving_input_receiver_fn()` to export our models in the SavedModel format required by TF Serving.\n",
"\n",
"*Note: Training will be slower than the last notebook because we are now evaluating after every 100 train steps. Previously we didn't evaluate until training finished.*"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In the cell below, we create a instance of `tf.estimator.RunConfig` named `config` and pass to its \n",
"constructor information concerning:\n",
" - the directory where we want the trained model and its checkpoints to be saved\n",
" - the random seed which we want to be set to 1\n",
" - the cadence at which we want the model to create checkpoints (every 100 steps)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"OUTDIR = \"taxi_trained\"\n",
"\n",
"config = tf.estimator.RunConfig(\n",
" model_dir = OUTDIR,\n",
" tf_random_seed = 1,\n",
" save_checkpoints_steps = 100\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next we create a `DNNRegressor` model with two layers of 10 neurons each using \n",
"the `RunConfig` instance and the `feature_cols` list you just created.\n",
"\n",
"Note that we do not pass the model directory directly to the estimator constructor, since that info should\n",
"already be wrapped into the `RunConfig` instance.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model = tf.estimator.DNNRegressor(\n",
" hidden_units = [10,10], \n",
" feature_columns = feature_cols, \n",
" config = config\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Adding custom evaluation metrics\n",
"\n",
"If we want to add a custom evaluation metric (one not included automatically with the canned `DNNRegressor` estimator) we will can do that by wrapping our model with our custom metric function using the `contrib` function `.add_metrics`. We will implement a `my_rmse` function that\n",
"- takes as input a tensor of `labels` and a tensor of `predictions`\n",
"- returns a dictionary with the single key `rmse` and with value the root mean square error between the labels and the predictions\n",
"\n",
"You can have a look at this blog post by Lak Lakshmanan on [\"How to extend a canned TensorFlow Estimator\"](https://towardsdatascience.com/how-to-extend-a-canned-tensorflow-estimator-to-add-more-evaluation-metrics-and-to-pass-through-ddf66cd3047d) for more information."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def my_rmse(labels, predictions): \n",
" pred_values = tf.squeeze(input = predictions[\"predictions\"], axis = -1)\n",
" return {\n",
" \"rmse\": tf.metrics.root_mean_squared_error(labels = labels, predictions = pred_values)\n",
" }"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run the following cell to add the custom metric you defined above to the `model`:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model = tf.contrib.estimator.add_metrics(estimator = model, metric_fn = my_rmse) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Next we'll create an instance of a `tf.estimator.TrainSpec` using the `train_input_fn` defined at the top of this file and train our model \n",
"with a `max_steps` of 500. Note that the training data is loaded from `./taxi-train.csv`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"train_spec = tf.estimator.TrainSpec(\n",
" input_fn = lambda: train_input_fn(\"./taxi-train.csv\"),\n",
" max_steps = 500\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Finally, we create an exporter using the `serving_input_receiver_fn` defined at the beginning of this notebook.\n",
"\n",
"You want to export the trained model and its checkpoints in the './exporter' subdirectory.\n",
"\n",
"Use `tf.estimator.FinalExport` to create the exporter intance. \n",
"\n",
"**Note:** You may alternatively use `tf.estimator.BestExporter`\n",
" to export at every checkpoint that has lower loss than the previous checkpoint, instead\n",
" of exporting only the last checkpoint."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"exporter = tf.estimator.FinalExporter(name = \"exporter\", serving_input_receiver_fn = serving_input_receiver_fn) "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In the cell below, create an instance of an `EvalSpec` to which you specify\n",
"that \n",
"- the data should be loaded from `/.taxi-valid.csv` during evaluation (use the correct input function!)\n",
"- the exporter you defined above should be used\n",
"- the first evaluation should start after 1 second of training\n",
"- and then be repeated every 1 second\n",
"\n",
"**Note:** We use the checkpoint setting above because we want to evaluate after every checkpoint.\n",
"As long as checkpoints are > 1 sec apart this ensures the throttling never kicks in."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"eval_spec = tf.estimator.EvalSpec(\n",
" input_fn = lambda: eval_input_fn(\"./taxi-valid.csv\"),\n",
" steps = None,\n",
" start_delay_secs = 1,\n",
" throttle_secs = 1,\n",
" exporters = exporter,\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run the following cell to start the training and evaluation as you specified them above:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"tf.logging.set_verbosity(tf.logging.INFO) \n",
"shutil.rmtree(path = OUTDIR, ignore_errors = True)\n",
"tf.summary.FileWriterCache.clear() # ensure filewriter cache is clear for TensorBoard events file\n",
"\n",
"tf.estimator.train_and_evaluate(estimator = model, \n",
" train_spec = train_spec, \n",
" eval_spec = eval_spec)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Inspect the export folder\n",
"\n",
"Now in the output directory, in addition to the checkpoint files, you'll see a subfolder called 'export'. This contains one or models in the SavedModel format which is compatible with TF Serving. In the next notebook we will deploy the SavedModel behind a production grade REST API."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!ls -R taxi_trained/export"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Monitoring with TensorBoard \n",
"\n",
"[TensorBoard](https://www.tensorflow.org/guide/summaries_and_tensorboard) is a web UI that allows us to visualize various aspects of our model, including the training and evaluation loss curves. Although you won't see the loss curves yet, it is best to launch TensorBoard *before* you start training so that you may see them update during a long running training process.\n",
"\n",
"To get Tensorboard to work within a Deep Learning VM or Colab, we need to create a tunnel connection to your local machine. To do this we'll set up a tunnel connection with `ngrok`. Using ngrok we'll then create a tunnel connection to our virtual machine's port 6006. We can view the Tensorboard results by following the link provided by ngrok after executing the following cell."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"get_ipython().system_raw(\n",
" \"tensorboard --logdir {} --host 0.0.0.0 --port 6006 &\"\n",
" .format(OUTDIR)\n",
")\n",
"\n",
"get_ipython().system_raw(\"./assets/ngrok http 6006 &\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!curl -s http://localhost:4040/api/tunnels | python3 -c \\\n",
" \"import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Tensorboard cleanup\n",
"\n",
"To close the tunnel connection to Tensorboard, we can find the PIDs for ngrok and Tensorboard and stop them. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# this will kill the processes for Tensorboard\n",
"!ps aux | grep tensorboard | awk '{print $2}' | xargs kill"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# this will kill the processes for ngrok\n",
"!ps aux | grep ngrok | awk '{print $2}' | xargs kill"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Challenge exercise\n",
"\n",
"Modify your solution to the challenge exercise in c_dataset.ipynb appropriately."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}