quests/serverlessml/06_feateng_keras/labs/taxifare_fc.ipynb (726 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Feature Engineering in Keras.\n", "\n", "This is a continuation of our [first Keras models](../../03_keras/solution/keras_dnn.ipynb) we created earlier but now with more feature engineering.\n", "\n", "### Learning objectives\n", "1. Use tf.data to read the CSV files\n", "2. Create new feature columns for better predictive power\n", "3. Build, train, and evaluate a new Keras DNN\n", "4. Make example predictions\n", "5. Export the model in preparation for serving later \n", "\n", "Each learning objective will correspond to a __#TODO__ in this student lab notebook -- try to complete this notebook first and then review the [solution notebook](../solution/taxifare_fc.ipynb). \n", "\n", "Let's start off with the Python imports that we need." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!sudo chown -R jupyter:jupyter /home/jupyter/training-data-analyst" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip install tensorflow==2.1 --user" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Please ignore any compatibility warnings and errors.\n", "Make sure to <b>restart</b> your kernel to ensure this change has taken place." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "export PROJECT=$(gcloud config list project --format \"value(core.project)\")\n", "echo \"Your current GCP Project Name is: \"$PROJECT\n", "gcloud config set ai_platform/region global" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os, json, math, shutil\n", "import datetime\n", "import numpy as np\n", "import logging\n", "# SET TF ERROR LOG VERBOSITY\n", "logging.getLogger(\"tensorflow\").setLevel(logging.ERROR)\n", "import tensorflow as tf\n", "print(tf.version.VERSION)\n", "\n", "PROJECT = \"your-gcp-project-here\" # REPLACE WITH YOUR PROJECT NAME\n", "REGION = \"us-central1\" # REPLACE WITH YOUR BUCKET REGION e.g. us-central1\n", "\n", "# Do not change these\n", "os.environ[\"PROJECT\"] = PROJECT\n", "os.environ[\"REGION\"] = REGION\n", "os.environ[\"BUCKET\"] = PROJECT # DEFAULT BUCKET WILL BE PROJECT ID\n", "\n", "if PROJECT == \"your-gcp-project-here\":\n", " print(\"Don't forget to update your PROJECT name! Currently:\", PROJECT)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a new Google Cloud Storage Bucket for ML model exports" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", " \n", "## Create new ML GCS bucket if it doesn't exist already...\n", "exists=$(gsutil ls -d | grep -w gs://${PROJECT}-ml/)\n", "\n", "if [ -n \"$exists\" ]; then\n", " echo -e \"Bucket exists, let's not recreate it.\"\n", " \n", "else\n", " echo \"Creating a new GCS bucket.\"\n", " gsutil mb -l ${REGION} gs://${PROJECT}-ml\n", " echo -e \"\\nHere are your current buckets:\"\n", " gsutil ls\n", "fi" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup parameters for notebook scheduling" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "parameters" ] }, "outputs": [], "source": [ "# Note that this cell is special. It's got a tag (you can view tags by clicking on the wrench icon on the left menu in Jupyter)\n", "# These are parameters that we will configure so that we can schedule this notebook\n", "DATADIR = '../../data'\n", "OUTDIR = './trained_model'\n", "NBUCKETS = 10 # for feature crossing\n", "TRAIN_BATCH_SIZE = 32\n", "NUM_TRAIN_EXAMPLES = 10000 * 5 # remember the training dataset repeats, so this will wrap around\n", "NUM_EVALS = 5 # evaluate this many times\n", "NUM_EVAL_EXAMPLES = 10000 # enough to get a reasonable sample, but no so much that it slows down" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Locating the CSV files\n", "\n", "We will start with the CSV files that we wrote out in the [first notebook](../01_explore/solution/taxifare.iypnb) of this sequence. Just so you don't have to run the notebook, we saved a copy in ../data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "display" ] }, "outputs": [], "source": [ "if DATADIR[:5] == 'gs://':\n", " !gsutil ls $DATADIR/*.csv\n", "else:\n", " !ls -l $DATADIR/*.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Use tf.data to read the CSV files\n", "\n", "We wrote these cells in the [third notebook](../03_tfdata/solution/input_pipeline.ipynb) of this sequence." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "CSV_COLUMNS = ['fare_amount', 'pickup_datetime',\n", " 'pickup_longitude', 'pickup_latitude', \n", " 'dropoff_longitude', 'dropoff_latitude', \n", " 'passenger_count', 'key']\n", "LABEL_COLUMN = 'fare_amount'\n", "DEFAULTS = [[0.0],['na'],[0.0],[0.0],[0.0],[0.0],[0.0],['na']]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def features_and_labels(row_data):\n", " for unwanted_col in ['key']: # keep the pickup_datetime!\n", " row_data.pop(unwanted_col)\n", " label = row_data.pop(LABEL_COLUMN)\n", " return row_data, label # features, label\n", "\n", "# load the training data\n", "def load_dataset(pattern, batch_size=1, mode=tf.estimator.ModeKeys.EVAL):\n", " pattern = '{}/{}'.format(DATADIR, pattern)\n", " dataset = (\n", " # TODO 1: Recall from earlier how you used tf.data to read the CSV files (no changes needed):\n", " tf.data.experimental.make_csv_dataset(pattern, batch_size, CSV_COLUMNS, DEFAULTS)\n", " .map(features_and_labels) # features, label\n", " )\n", " if mode == tf.estimator.ModeKeys.TRAIN:\n", " print(\"Repeating training dataset indefinitely\")\n", " dataset = dataset.shuffle(1000).repeat()\n", " dataset = dataset.prefetch(1) # take advantage of multi-threading; 1=AUTOTUNE\n", " return dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def parse_datetime(s):\n", " if type(s) is not str:\n", " s = s.numpy().decode('utf-8') # if it is a Tensor\n", " return datetime.datetime.strptime(s, \"%Y-%m-%d %H:%M:%S %Z\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "display" ] }, "outputs": [], "source": [ "for s in ['2012-07-05 14:18:00 UTC']:\n", " print(s)\n", " for ts in [parse_datetime(s), parse_datetime(tf.constant(s))]: # as string, as tensor\n", " print(ts.weekday())\n", " DAYS = ['Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat']\n", " print(DAYS[ts.weekday()])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Add data transformations and build the new DNN" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Add transformations\n", "def euclidean(params):\n", " lon1, lat1, lon2, lat2 = params\n", " \n", " # TODO 2: Create two new features called londiff and latdiff\n", " # These should be the difference between lon - lon and lat - lat\n", " londiff = \n", " latdiff = \n", " return tf.sqrt(londiff*londiff + latdiff*latdiff)\n", "\n", "DAYS = ['Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat']\n", "def get_dayofweek(s):\n", " ts = parse_datetime(s)\n", " return DAYS[ts.weekday()]\n", "\n", "@tf.function\n", "def dayofweek(ts_in):\n", " return tf.map_fn(\n", " lambda s: tf.py_function(get_dayofweek, inp=[s], Tout=tf.string),\n", " ts_in\n", " )\n", "\n", "@tf.function\n", "def fare_thresh(x):\n", " return 60 * tf.keras.activations.relu(x)\n", "\n", "def transform(inputs, NUMERIC_COLS, STRING_COLS):\n", " print(\"BEFORE TRANSFORMATION\")\n", " print(\"INPUTS:\", inputs.keys())\n", " \n", " # Pass-through columns\n", " transformed = inputs.copy()\n", " del transformed['pickup_datetime']\n", " \n", " feature_columns = {\n", " colname: tf.feature_column.numeric_column(colname)\n", " for colname in NUMERIC_COLS\n", " }\n", " \n", " # scale the lat, lon values to be in 0, 1\n", " if True:\n", " for lon_col in ['pickup_longitude', 'dropoff_longitude']: # in range -70 to -78\n", " transformed[lon_col] = tf.keras.layers.Lambda(\n", " lambda x: (x+78)/8.0, \n", " name='scale_{}'.format(lon_col)\n", " )(inputs[lon_col])\n", " for lat_col in ['pickup_latitude', 'dropoff_latitude']: # in range 37 to 45\n", " transformed[lat_col] = tf.keras.layers.Lambda(\n", " lambda x: (x-37)/8.0, \n", " name='scale_{}'.format(lat_col)\n", " )(inputs[lat_col])\n", "\n", " # add Euclidean distance. Doesn't have to be accurate calculation because NN will calibrate it\n", " if True:\n", " transformed['euclidean'] = tf.keras.layers.Lambda(euclidean, name='euclidean')([\n", " inputs['pickup_longitude'],\n", " inputs['pickup_latitude'],\n", " inputs['dropoff_longitude'],\n", " inputs['dropoff_latitude']\n", " ])\n", " feature_columns['euclidean'] = tf.feature_column.numeric_column('euclidean')\n", " \n", " # hour of day from timestamp of form '2010-02-08 09:17:00+00:00'\n", " if True:\n", " transformed['hourofday'] = tf.keras.layers.Lambda(\n", " lambda x: tf.strings.to_number(tf.strings.substr(x, 11, 2), out_type=tf.dtypes.int32),\n", " name='hourofday'\n", " )(inputs['pickup_datetime'])\n", " feature_columns['hourofday'] = tf.feature_column.indicator_column(\n", " tf.feature_column.categorical_column_with_identity('hourofday', num_buckets=24))\n", "\n", " if False:\n", " # day of week is hard because there is no TensorFlow function for date handling\n", " transformed['dayofweek'] = tf.keras.layers.Lambda(\n", " lambda x: dayofweek(x),\n", " name='dayofweek_pyfun'\n", " )(inputs['pickup_datetime'])\n", " transformed['dayofweek'] = tf.keras.layers.Reshape((), name='dayofweek')(transformed['dayofweek'])\n", " feature_columns['dayofweek'] = tf.feature_column.indicator_column(\n", " tf.feature_column.categorical_column_with_vocabulary_list(\n", " 'dayofweek', vocabulary_list = DAYS))\n", " \n", " if True:\n", " # featurecross lat, lon into nxn buckets, then embed\n", " nbuckets = NBUCKETS\n", " latbuckets = np.linspace(0, 1, nbuckets).tolist()\n", " lonbuckets = np.linspace(0, 1, nbuckets).tolist()\n", " b_plat = tf.feature_column.bucketized_column(feature_columns['pickup_latitude'], latbuckets)\n", " b_dlat = tf.feature_column.bucketized_column(feature_columns['dropoff_latitude'], latbuckets)\n", " b_plon = tf.feature_column.bucketized_column(feature_columns['pickup_longitude'], lonbuckets)\n", " b_dlon = tf.feature_column.bucketized_column(feature_columns['dropoff_longitude'], lonbuckets)\n", " ploc = tf.feature_column.crossed_column([b_plat, b_plon], nbuckets * nbuckets)\n", " dloc = tf.feature_column.crossed_column([b_dlat, b_dlon], nbuckets * nbuckets)\n", " pd_pair = tf.feature_column.crossed_column([ploc, dloc], nbuckets ** 4 )\n", " feature_columns['pickup_and_dropoff'] = tf.feature_column.embedding_column(pd_pair, 100)\n", "\n", " print(\"AFTER TRANSFORMATION\")\n", " print(\"TRANSFORMED:\", transformed.keys())\n", " print(\"FEATURES\", feature_columns.keys()) \n", " return transformed, feature_columns\n", "\n", "def rmse(y_true, y_pred):\n", " return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true))) \n", "\n", "def build_dnn_model():\n", " # input layer is all float except for pickup_datetime which is a string\n", " STRING_COLS = ['pickup_datetime']\n", " NUMERIC_COLS = set(CSV_COLUMNS) - set([LABEL_COLUMN, 'key']) - set(STRING_COLS)\n", " print(STRING_COLS)\n", " print(NUMERIC_COLS)\n", " inputs = {\n", " colname : tf.keras.layers.Input(name=colname, shape=(), dtype='float32')\n", " for colname in NUMERIC_COLS\n", " }\n", " inputs.update({\n", " colname : tf.keras.layers.Input(name=colname, shape=(), dtype='string')\n", " for colname in STRING_COLS\n", " })\n", " \n", " # transforms\n", " transformed, feature_columns = transform(inputs, NUMERIC_COLS, STRING_COLS)\n", " \n", " # TODO 3: Specify the dense feature layers for the DNN as inputs\n", " # Tip: Refer to https://www.tensorflow.org/versions/r2.0/api_docs/python/tf/keras/layers/DenseFeatures\n", " dnn_inputs = \n", "\n", " # two hidden layers of [32, 8] just in like the BQML DNN\n", " h1 = tf.keras.layers.Dense(32, activation='relu', name='h1')(dnn_inputs)\n", " h2 = tf.keras.layers.Dense(8, activation='relu', name='h2')(h1)\n", "\n", " if False:\n", " # final output would normally have a linear activation because this is regression\n", " # However, we know something about the taxi problem (fares are +ve and tend to be below $60).\n", " # Use that here. (You can verify by running this query):\n", " output = tf.keras.layers.Dense(1, activation=fare_thresh, name='fare')(h2)\n", " else:\n", " output = tf.keras.layers.Dense(1, name='fare')(h2)\n", " \n", " model = tf.keras.models.Model(inputs, output)\n", " model.compile(optimizer='adam', loss='mse', metrics=[rmse, 'mse'])\n", " return model\n", "\n", "model = build_dnn_model()\n", "print(model.summary())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Visualize the DNN model layers" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "display" ] }, "outputs": [], "source": [ "tf.keras.utils.plot_model(model, 'dnn_model.png', show_shapes=False, rankdir='LR')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train the model\n", "\n", "To train the model, call [model.fit()](https://keras.io/models/model/#fit)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "trainds = load_dataset('taxi-train*', TRAIN_BATCH_SIZE, tf.estimator.ModeKeys.TRAIN)\n", "evalds = load_dataset('taxi-valid*', 1000, tf.estimator.ModeKeys.EVAL).take(NUM_EVAL_EXAMPLES//10000) # evaluate on 1/10 final evaluation set\n", "\n", "steps_per_epoch = NUM_TRAIN_EXAMPLES // (TRAIN_BATCH_SIZE * NUM_EVALS)\n", "\n", "shutil.rmtree('{}/checkpoints/'.format(OUTDIR), ignore_errors=True)\n", "checkpoint_path = '{}/checkpoints/taxi'.format(OUTDIR)\n", "cp_callback = tf.keras.callbacks.ModelCheckpoint(checkpoint_path, \n", " save_weights_only=True,\n", " verbose=1)\n", "\n", "history = model.fit(trainds, \n", " validation_data=evalds,\n", " epochs=NUM_EVALS, \n", " steps_per_epoch=steps_per_epoch,\n", " verbose=2, # 0=silent, 1=progress bar, 2=one line per epoch\n", " callbacks=[cp_callback])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Visualize the Model Loss Curves" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "display" ] }, "outputs": [], "source": [ "# plot\n", "import matplotlib.pyplot as plt\n", "nrows = 1\n", "ncols = 2\n", "fig = plt.figure(figsize=(10, 5))\n", "\n", "for idx, key in enumerate(['loss', 'rmse']):\n", " ax = fig.add_subplot(nrows, ncols, idx+1)\n", " plt.plot(history.history[key])\n", " plt.plot(history.history['val_{}'.format(key)])\n", " plt.title('model {}'.format(key))\n", " plt.ylabel(key)\n", " plt.xlabel('epoch')\n", " plt.legend(['train', 'validation'], loc='upper left');" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Evaluate over the full validation dataset\n", "\n", "Let's evaluate over the full validation dataset (provided the validation dataset is large enough)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "evalds = load_dataset('taxi-valid*', 1000, tf.estimator.ModeKeys.EVAL).take(NUM_EVAL_EXAMPLES//1000)\n", "model.evaluate(evalds)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Although we get RMSE of around 10 (your answer will be different due to random seeds), remember that we trained on a really small subset of the data. We need a larger training dataset before making decisions about this model." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Predict with the new model\n", "\n", "This is how to predict with this model:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "display" ] }, "outputs": [], "source": [ "# TODO 4: Make example predictions. Experiment with different passenger_counts and pickup times and re-run.\n", "model.predict({\n", " 'pickup_longitude': tf.convert_to_tensor([-73.982683]),\n", " 'pickup_latitude': tf.convert_to_tensor([40.742104]),\n", " 'dropoff_longitude': tf.convert_to_tensor([-73.983766]),\n", " 'dropoff_latitude': tf.convert_to_tensor([40.755174]),\n", " 'passenger_count': tf.convert_to_tensor([3.0]),\n", " 'pickup_datetime': tf.convert_to_tensor(['2010-02-08 09:17:00 UTC'], dtype=tf.string),\n", "}, steps=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "However, this is not realistic, because we can't expect client code to have a model object in memory. We'll have to export our model to a file, and expect client code to instantiate the model from that exported file." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Export the model for serving later\n", "\n", "Let's export the model to a TensorFlow SavedModel format. Once we have a model in this format, we have lots of ways to \"serve\" the model, from a web application, from JavaScript, from mobile applications, etc." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import shutil, os, datetime\n", "OUTPUT_DIR = os.path.join(OUTDIR, 'export/savedmodel')\n", "if OUTPUT_DIR[:5] != 'gs://':\n", " shutil.rmtree(OUTPUT_DIR, ignore_errors=True)\n", "EXPORT_PATH = os.path.join(OUTPUT_DIR, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))\n", "\n", "# TODO 5: Export the model in preparation for serving later\n", "# Specify the model and export path to save to\n", "# Tip: Refer to: https://www.tensorflow.org/versions/r2.0/api_docs/python/tf/saved_model/save\n", "tf. # <--- complete" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!saved_model_cli show --tag_set serve --signature_def serving_default --dir {EXPORT_PATH}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!find {EXPORT_PATH}\n", "os.environ['EXPORT_PATH'] = EXPORT_PATH" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Serve the model on Cloud AI Platform" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Note**: If required, enable **AI Platform Training & Prediction API**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "PROJECT=${PROJECT}\n", "BUCKET=${PROJECT}-ml\n", "REGION=us-east1\n", "MODEL_NAME=taxifare\n", "VERSION_NAME=v2\n", "\n", "if [[ $(gcloud ai-platform models list --format='value(name)' | grep $MODEL_NAME) ]]; then\n", " echo \"$MODEL_NAME already exists\"\n", "else\n", " # create model\n", " echo \"Creating $MODEL_NAME\"\n", " gcloud ai-platform models create --regions=$REGION $MODEL_NAME\n", "fi\n", "\n", "if [[ $(gcloud ai-platform versions list --model $MODEL_NAME --format='value(name)' | grep $VERSION_NAME) ]]; then\n", " echo \"Deleting already existing $MODEL_NAME:$VERSION_NAME ... \"\n", " gcloud ai-platform versions delete --model=$MODEL_NAME $VERSION_NAME\n", " echo \"Please run this cell again if you don't see a Creating message ... \"\n", " sleep 10\n", "fi\n", "\n", "# create model\n", "echo \"Creating $MODEL_NAME:$VERSION_NAME\"\n", "gcloud ai-platform versions create --model=$MODEL_NAME $VERSION_NAME --async \\\n", " --framework=tensorflow --python-version=3.7 --runtime-version=2.1 \\\n", " --origin=$EXPORT_PATH --staging-bucket=gs://$BUCKET" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this notebook, we have looked at how to implement a custom Keras model using feature columns." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Make predictions using `gcloud ai-platform predict`" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile repro.json\n", "{\"pickup_longitude\": -73.982683, \"pickup_latitude\": 40.742104, \"dropoff_longitude\": -73.983766, \"dropoff_latitude\": 40.755174, \"passenger_count\": 3.0, \"pickup_datetime\": \"2010-02-08 09:17:00 UTC\"}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Note**: If you get HTTP request failed error, wait for a while to complete the CreateVersion and re-run the below cell" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!gcloud ai-platform predict --model taxifare --json-instances repro.json --version v2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright 2022 Google Inc.\n", "Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at\n", "http://www.apache.org/licenses/LICENSE-2.0\n", "Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }