quests/endtoendml/solutions/5_train_keras.ipynb (1,292 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "<h1>Training Keras model on Cloud AI Platform</h1>\n", "\n", "This notebook illustrates distributed training on Cloud AI Platform. This uses Keras and requires TensorFlow 2.1" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Ensure the right version of Tensorflow is installed.\n", "!pip freeze | grep tensorflow==2.1" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "hJ7ByvoXzpVI" }, "source": [ "## Set up environment variables and load necessary libraries" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Set environment variables so that we can use them throughout the entire notebook. We will be using our project name for our bucket, so you only need to change your project and region." ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "collapsed": true, "deletable": true, "editable": true, "jupyter": { "outputs_hidden": true } }, "outputs": [], "source": [ "# change these to try this notebook out\n", "BUCKET = 'cloud-training-demos-ml' # Replace with the your bucket name\n", "PROJECT = 'cloud-training-demos' # Replace with your project-id\n", "REGION = 'us-central1'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "os.environ[\"PROJECT\"] = PROJECT\n", "os.environ[\"BUCKET\"] = BUCKET\n", "os.environ[\"REGION\"] = REGION\n", "os.environ[\"TFVERSION\"] = \"2.1\"\n", "os.environ[\"PYTHONVERSION\"] = \"3.7\"" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "mC9K9Dpx1ztf" }, "source": [ "Check that the Google BigQuery library is installed and if not, install it. " ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 609 }, "colab_type": "code", "id": "RZUQtASG10xO", "outputId": "5612d6b0-9730-476a-a28f-8fdc14f4ecde" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "google-cloud-bigquery==1.6.1\n" ] } ], "source": [ "%%bash\n", "sudo pip freeze | grep google-cloud-bigquery==1.6.1 || \\\n", "sudo pip install google-cloud-bigquery==1.6.1" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import os\n", "from google.cloud import bigquery" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "export PROJECT=$(gcloud config list project --format \"value(core.project)\")\n", "echo \"Your current GCP Project Name is: \"$PROJECT" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "L0-vOB4y2BJM" }, "source": [ "## The source dataset\n", "\n", "Our dataset is hosted in [BigQuery](https://cloud.google.com/bigquery/). The CDC's Natality data has details on US births from 1969 to 2008 and is a publically available dataset, meaning anyone with a GCP account has access. Click [here](https://console.cloud.google.com/bigquery?project=bigquery-public-data&p=publicdata&d=samples&t=natality&page=table) to access the dataset.\n", "\n", "The natality dataset is relatively large at almost 138 million rows and 31 columns, but simple to understand. `weight_pounds` is the target, the continuous value we’ll train a model to predict." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a BigQuery Dataset and Google Cloud Storage Bucket \n", "\n", "A BigQuery dataset is a container for tables, views, and models built with BigQuery ML. Let's create one called __babyweight__. We'll do the same for a GCS bucket for our project too." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "\n", "# Create a BigQuery dataset for babyweight if it doesn't exist\n", "datasetexists=$(bq ls -d | grep -w babyweight)\n", "\n", "if [ -n \"$datasetexists\" ]; then\n", " echo -e \"BigQuery dataset already exists, let's not recreate it.\"\n", "\n", "else\n", " echo \"Creating BigQuery dataset titled: babyweight\"\n", " \n", " bq --location=US mk --dataset \\\n", " --description \"Babyweight\" \\\n", " $PROJECT:babyweight\n", " echo \"Here are your current datasets:\"\n", " bq ls\n", "fi\n", " \n", "## Create GCS bucket if it doesn't exist already...\n", "exists=$(gsutil ls -d | grep -w gs://${BUCKET}/)\n", "\n", "if [ -n \"$exists\" ]; then\n", " echo -e \"Bucket exists, let's not recreate it.\"\n", " \n", "else\n", " echo \"Creating a new GCS bucket.\"\n", " gsutil mb -l ${REGION} gs://${BUCKET}\n", " echo \"Here are your current buckets:\"\n", " gsutil ls\n", "fi" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "b2TuS1s9vREL" }, "source": [ "## Create the training and evaluation data tables\n", "\n", "Since there is already a publicly available dataset, we can simply create the training and evaluation data tables using this raw input data. First we are going to create a subset of the data limiting our columns to `weight_pounds`, `is_male`, `mother_age`, `plurality`, and `gestation_weeks` as well as some simple filtering and a column to hash on for repeatable splitting.\n", "\n", "* Note: The dataset in the create table code below is the one created previously, e.g. \"babyweight\"." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Preprocess and filter dataset\n", "\n", "We have some preprocessing and filtering we would like to do to get our data in the right format for training.\n", "\n", "Preprocessing:\n", "* Cast `is_male` from `BOOL` to `STRING`\n", "* Cast `plurality` from `INTEGER` to `STRING` where `[1, 2, 3, 4, 5]` becomes `[\"Single(1)\", \"Twins(2)\", \"Triplets(3)\", \"Quadruplets(4)\", \"Quintuplets(5)\"]`\n", "* Add `hashcolumn` hashing on `year` and `month`\n", "\n", "Filtering:\n", "* Only want data for years later than `2000`\n", "* Only want baby weights greater than `0`\n", "* Only want mothers whose age is greater than `0`\n", "* Only want plurality to be greater than `0`\n", "* Only want the number of weeks of gestation to be greater than `0`" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "Empty DataFrame\n", "Columns: []\n", "Index: []" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%bigquery\n", "CREATE OR REPLACE TABLE\n", " babyweight.babyweight_data AS\n", "SELECT\n", " weight_pounds,\n", " CAST(is_male AS STRING) AS is_male,\n", " mother_age,\n", " CASE\n", " WHEN plurality = 1 THEN \"Single(1)\"\n", " WHEN plurality = 2 THEN \"Twins(2)\"\n", " WHEN plurality = 3 THEN \"Triplets(3)\"\n", " WHEN plurality = 4 THEN \"Quadruplets(4)\"\n", " WHEN plurality = 5 THEN \"Quintuplets(5)\"\n", " END AS plurality,\n", " gestation_weeks,\n", " FARM_FINGERPRINT(\n", " CONCAT(\n", " CAST(year AS STRING),\n", " CAST(month AS STRING)\n", " )\n", " ) AS hashmonth\n", "FROM\n", " publicdata.samples.natality\n", "WHERE\n", " year > 2000\n", " AND weight_pounds > 0\n", " AND mother_age > 0\n", " AND plurality > 0\n", " AND gestation_weeks > 0" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Augment dataset to simulate missing data\n", "\n", "Now we want to augment our dataset with our simulated babyweight data by setting all gender information to `Unknown` and setting plurality of all non-single births to `Multiple(2+)`." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "Empty DataFrame\n", "Columns: []\n", "Index: []" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%bigquery\n", "CREATE OR REPLACE TABLE\n", " babyweight.babyweight_augmented_data AS\n", "SELECT\n", " weight_pounds,\n", " is_male,\n", " mother_age,\n", " plurality,\n", " gestation_weeks,\n", " hashmonth\n", "FROM\n", " babyweight.babyweight_data\n", "UNION ALL\n", "SELECT\n", " weight_pounds,\n", " \"Unknown\" AS is_male,\n", " mother_age,\n", " CASE\n", " WHEN plurality = \"Single(1)\" THEN plurality\n", " ELSE \"Multiple(2+)\"\n", " END AS plurality,\n", " gestation_weeks,\n", " hashmonth\n", "FROM\n", " babyweight.babyweight_data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Split augmented dataset into train and eval sets\n", "\n", "Using `hashmonth`, apply a modulo to get approximately a 75/25 train/eval split." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Split augmented dataset into train dataset" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "colab": {}, "colab_type": "code", "id": "CMNRractvREL" }, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "Empty DataFrame\n", "Columns: []\n", "Index: []" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%bigquery\n", "CREATE OR REPLACE TABLE\n", " babyweight.babyweight_data_train AS\n", "SELECT\n", " weight_pounds,\n", " is_male,\n", " mother_age,\n", " plurality,\n", " gestation_weeks\n", "FROM\n", " babyweight.babyweight_augmented_data\n", "WHERE\n", " ABS(MOD(hashmonth, 4)) < 3" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Split augmented dataset into eval dataset" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "Empty DataFrame\n", "Columns: []\n", "Index: []" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%bigquery\n", "CREATE OR REPLACE TABLE\n", " babyweight.babyweight_data_eval AS\n", "SELECT\n", " weight_pounds,\n", " is_male,\n", " mother_age,\n", " plurality,\n", " gestation_weeks\n", "FROM\n", " babyweight.babyweight_augmented_data\n", "WHERE\n", " ABS(MOD(hashmonth, 4)) = 3" ] }, { "cell_type": "markdown", "metadata": { "colab_type": "text", "id": "clnaaqQsXkwC" }, "source": [ "## Verify table creation\n", "\n", "Verify that you created the dataset and training data table." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>weight_pounds</th>\n", " <th>is_male</th>\n", " <th>mother_age</th>\n", " <th>plurality</th>\n", " <th>gestation_weeks</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "Empty DataFrame\n", "Columns: [weight_pounds, is_male, mother_age, plurality, gestation_weeks]\n", "Index: []" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%bigquery\n", "-- LIMIT 0 is a free query; this allows us to check that the table exists.\n", "SELECT * FROM babyweight.babyweight_data_train\n", "LIMIT 0" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>weight_pounds</th>\n", " <th>is_male</th>\n", " <th>mother_age</th>\n", " <th>plurality</th>\n", " <th>gestation_weeks</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ "Empty DataFrame\n", "Columns: [weight_pounds, is_male, mother_age, plurality, gestation_weeks]\n", "Index: []" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%bigquery\n", "-- LIMIT 0 is a free query; this allows us to check that the table exists.\n", "SELECT * FROM babyweight.babyweight_data_eval\n", "LIMIT 0" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Export from BigQuery to CSVs in GCS\n", "\n", "Use BigQuery Python API to export our train and eval tables to Google Cloud Storage in the CSV format to be used later for TensorFlow/Keras training. We'll want to use the dataset we've been using above as well as repeat the process for both training and evaluation data." ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Exported qwiklabs-gcp-4b437f7e5bfff9dd:babyweight.babyweight_data_train to gs://qwiklabs-gcp-4b437f7e5bfff9dd/babyweight/data/train*.csv\n", "Exported qwiklabs-gcp-4b437f7e5bfff9dd:babyweight.babyweight_data_eval to gs://qwiklabs-gcp-4b437f7e5bfff9dd/babyweight/data/eval*.csv\n" ] } ], "source": [ "# Construct a BigQuery client object.\n", "client = bigquery.Client()\n", "\n", "dataset_name = \"babyweight\"\n", "\n", "# Create dataset reference object\n", "dataset_ref = client.dataset(\n", " dataset_id=dataset_name, project=client.project)\n", "\n", "# Export both train and eval tables\n", "for step in [\"train\", \"eval\"]:\n", " destination_uri = os.path.join(\n", " \"gs://\", BUCKET, dataset_name, \"data\", \"{}*.csv\".format(step))\n", " table_name = \"babyweight_data_{}\".format(step)\n", " table_ref = dataset_ref.table(table_name)\n", " extract_job = client.extract_table(\n", " table_ref,\n", " destination_uri,\n", " # Location must match that of the source table.\n", " location=\"US\",\n", " ) # API request\n", " extract_job.result() # Waits for job to complete.\n", "\n", " print(\"Exported {}:{}.{} to {}\".format(\n", " client.project, dataset_name, table_name, destination_uri))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Verify CSV creation\n", "\n", "Verify that we correctly created the CSV files in our bucket." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "gs://qwiklabs-gcp-4b437f7e5bfff9dd/babyweight/data/eval000000000000.csv\n", "gs://qwiklabs-gcp-4b437f7e5bfff9dd/babyweight/data/eval000000000001.csv\n", "gs://qwiklabs-gcp-4b437f7e5bfff9dd/babyweight/data/train000000000000.csv\n", "gs://qwiklabs-gcp-4b437f7e5bfff9dd/babyweight/data/train000000000001.csv\n", "gs://qwiklabs-gcp-4b437f7e5bfff9dd/babyweight/data/train000000000002.csv\n", "gs://qwiklabs-gcp-4b437f7e5bfff9dd/babyweight/data/train000000000003.csv\n", "gs://qwiklabs-gcp-4b437f7e5bfff9dd/babyweight/data/train000000000004.csv\n", "gs://qwiklabs-gcp-4b437f7e5bfff9dd/babyweight/data/train000000000005.csv\n" ] } ], "source": [ "%%bash\n", "gsutil ls gs://${BUCKET}/babyweight/data/*.csv" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Check data exists\n", "\n", "Verify that you previously created CSV files we'll be using for training and evaluation." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "gsutil ls gs://${BUCKET}/babyweight/data/*000000000000.csv" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "mkdir -p babyweight/trainer\n", "touch babyweight/trainer/__init__.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We then use the `%%writefile` magic to write the contents of the cell below to a file called `task.py` in the `babyweight/trainer` folder." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create trainer module's task.py to hold hyperparameter argparsing code.\n", "\n", "The cell below writes the file `babyweight/trainer/task.py` which sets up our training job. Here is where we determine which parameters of our model to pass as flags during training using the `parser` module. Look at how `batch_size` is passed to the model in the code below. Use this as an example to parse arguements for the following variables\n", "- `nnsize` which represents the hidden layer sizes to use for DNN feature columns\n", "- `nembeds` which represents the embedding size of a cross of n key real-valued parameters\n", "- `train_examples` which represents the number of examples (in thousands) to run the training job\n", "- `eval_steps` which represents the positive number of steps for which to evaluate model\n", "\n", "Be sure to include a default value for the parsed arguments above and specfy the `type` if necessary." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile babyweight/trainer/task.py\n", "import argparse\n", "import json\n", "import os\n", "\n", "from trainer import model\n", "\n", "import tensorflow as tf\n", "\n", "if __name__ == \"__main__\":\n", " parser = argparse.ArgumentParser()\n", " parser.add_argument(\n", " \"--job-dir\",\n", " help=\"this model ignores this field, but it is required by gcloud\",\n", " default=\"junk\"\n", " )\n", " parser.add_argument(\n", " \"--train_data_path\",\n", " help=\"GCS location of training data\",\n", " required=True\n", " )\n", " parser.add_argument(\n", " \"--eval_data_path\",\n", " help=\"GCS location of evaluation data\",\n", " required=True\n", " )\n", " parser.add_argument(\n", " \"--output_dir\",\n", " help=\"GCS location to write checkpoints and export models\",\n", " required=True\n", " )\n", " parser.add_argument(\n", " \"--batch_size\",\n", " help=\"Number of examples to compute gradient over.\",\n", " type=int,\n", " default=512\n", " )\n", " parser.add_argument(\n", " \"--nnsize\",\n", " help=\"Hidden layer sizes for DNN -- provide space-separated layers\",\n", " nargs=\"+\",\n", " type=int,\n", " default=[128, 32, 4]\n", " )\n", " parser.add_argument(\n", " \"--nembeds\",\n", " help=\"Embedding size of a cross of n key real-valued parameters\",\n", " type=int,\n", " default=3\n", " )\n", " parser.add_argument(\n", " \"--num_epochs\",\n", " help=\"Number of epochs to train the model.\",\n", " type=int,\n", " default=10\n", " )\n", " parser.add_argument(\n", " \"--train_examples\",\n", " help=\"\"\"Number of examples (in thousands) to run the training job over.\n", " If this is more than actual # of examples available, it cycles through\n", " them. So specifying 1000 here when you have only 100k examples makes\n", " this 10 epochs.\"\"\",\n", " type=int,\n", " default=5000\n", " )\n", " parser.add_argument(\n", " \"--eval_steps\",\n", " help=\"\"\"Positive number of steps for which to evaluate model. Default\n", " to None, which means to evaluate until input_fn raises an end-of-input\n", " exception\"\"\",\n", " type=int,\n", " default=None\n", " )\n", "\n", " # Parse all arguments\n", " args = parser.parse_args()\n", " arguments = args.__dict__\n", "\n", " # Unused args provided by service\n", " arguments.pop(\"job_dir\", None)\n", " arguments.pop(\"job-dir\", None)\n", "\n", " # Modify some arguments\n", " arguments[\"train_examples\"] *= 1000\n", "\n", " # Append trial_id to path if we are doing hptuning\n", " # This code can be removed if you are not using hyperparameter tuning\n", " arguments[\"output_dir\"] = os.path.join(\n", " arguments[\"output_dir\"],\n", " json.loads(\n", " os.environ.get(\"TF_CONFIG\", \"{}\")\n", " ).get(\"task\", {}).get(\"trial\", \"\")\n", " )\n", "\n", " # Run the training job\n", " model.train_and_evaluate(arguments)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the same way we can write to the file `model.py` the model that we developed in the previous notebooks. \n", "\n", "### Create trainer module's model.py to hold Keras model code.\n", "\n", "To create our `model.py`, we'll use the code we wrote for the Wide & Deep model. Look back at your [9_keras_wide_and_deep_babyweight](../solutions/9_keras_wide_and_deep_babyweight.ipynb) notebook and copy/paste the necessary code from that notebook into its place in the cell below." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile babyweight/trainer/model.py\n", "import datetime\n", "import os\n", "import shutil\n", "import numpy as np\n", "import tensorflow as tf\n", "import hypertune\n", "\n", "# Determine CSV, label, and key columns\n", "CSV_COLUMNS = [\"weight_pounds\",\n", " \"is_male\",\n", " \"mother_age\",\n", " \"plurality\",\n", " \"gestation_weeks\"]\n", "LABEL_COLUMN = \"weight_pounds\"\n", "\n", "# Set default values for each CSV column.\n", "# Treat is_male and plurality as strings.\n", "DEFAULTS = [[0.0], [\"null\"], [0.0], [\"null\"], [0.0]]\n", "\n", "\n", "def features_and_labels(row_data):\n", " \"\"\"Splits features and labels from feature dictionary.\n", "\n", " Args:\n", " row_data: Dictionary of CSV column names and tensor values.\n", " Returns:\n", " Dictionary of feature tensors and label tensor.\n", " \"\"\"\n", " label = row_data.pop(LABEL_COLUMN)\n", "\n", " return row_data, label # features, label\n", "\n", "\n", "def load_dataset(pattern, batch_size=1, mode='eval'):\n", " \"\"\"Loads dataset using the tf.data API from CSV files.\n", "\n", " Args:\n", " pattern: str, file pattern to glob into list of files.\n", " batch_size: int, the number of examples per batch.\n", " mode: 'train' | 'eval' to determine if training or evaluating.\n", " Returns:\n", " `Dataset` object.\n", " \"\"\"\n", " print(\"mode = {}\".format(mode))\n", " # Make a CSV dataset\n", " dataset = tf.data.experimental.make_csv_dataset(\n", " file_pattern=pattern,\n", " batch_size=batch_size,\n", " column_names=CSV_COLUMNS,\n", " column_defaults=DEFAULTS)\n", "\n", " # Map dataset to features and label\n", " dataset = dataset.map(map_func=features_and_labels) # features, label\n", "\n", " # Shuffle and repeat for training\n", " if mode == 'train':\n", " dataset = dataset.shuffle(buffer_size=1000).repeat()\n", "\n", " # Take advantage of multi-threading; 1=AUTOTUNE\n", " dataset = dataset.prefetch(buffer_size=1)\n", "\n", " return dataset\n", "\n", "\n", "def create_input_layers():\n", " \"\"\"Creates dictionary of input layers for each feature.\n", "\n", " Returns:\n", " Dictionary of `tf.Keras.layers.Input` layers for each feature.\n", " \"\"\"\n", " deep_inputs = {\n", " colname: tf.keras.layers.Input(\n", " name=colname, shape=(), dtype=\"float32\")\n", " for colname in [\"mother_age\", \"gestation_weeks\"]\n", " }\n", "\n", " wide_inputs = {\n", " colname: tf.keras.layers.Input(\n", " name=colname, shape=(), dtype=\"string\")\n", " for colname in [\"is_male\", \"plurality\"]\n", " }\n", "\n", " inputs = {**wide_inputs, **deep_inputs}\n", "\n", " return inputs\n", "\n", "\n", "def categorical_fc(name, values):\n", " \"\"\"Helper function to wrap categorical feature by indicator column.\n", "\n", " Args:\n", " name: str, name of feature.\n", " values: list, list of strings of categorical values.\n", " Returns:\n", " Categorical and indicator column of categorical feature.\n", " \"\"\"\n", " cat_column = tf.feature_column.categorical_column_with_vocabulary_list(\n", " key=name, vocabulary_list=values)\n", " ind_column = tf.feature_column.indicator_column(\n", " categorical_column=cat_column)\n", "\n", " return cat_column, ind_column\n", "\n", "\n", "def create_feature_columns(nembeds):\n", " \"\"\"Creates wide and deep dictionaries of feature columns from inputs.\n", "\n", " Args:\n", " nembeds: int, number of dimensions to embed categorical column down to.\n", " Returns:\n", " Wide and deep dictionaries of feature columns.\n", " \"\"\"\n", " deep_fc = {\n", " colname: tf.feature_column.numeric_column(key=colname)\n", " for colname in [\"mother_age\", \"gestation_weeks\"]\n", " }\n", " wide_fc = {}\n", " is_male, wide_fc[\"is_male\"] = categorical_fc(\n", " \"is_male\", [\"True\", \"False\", \"Unknown\"])\n", " plurality, wide_fc[\"plurality\"] = categorical_fc(\n", " \"plurality\", [\"Single(1)\", \"Twins(2)\", \"Triplets(3)\",\n", " \"Quadruplets(4)\", \"Quintuplets(5)\", \"Multiple(2+)\"])\n", "\n", " # Bucketize the float fields. This makes them wide\n", " age_buckets = tf.feature_column.bucketized_column(\n", " source_column=deep_fc[\"mother_age\"],\n", " boundaries=np.arange(15, 45, 1).tolist())\n", " wide_fc[\"age_buckets\"] = tf.feature_column.indicator_column(\n", " categorical_column=age_buckets)\n", "\n", " gestation_buckets = tf.feature_column.bucketized_column(\n", " source_column=deep_fc[\"gestation_weeks\"],\n", " boundaries=np.arange(17, 47, 1).tolist())\n", " wide_fc[\"gestation_buckets\"] = tf.feature_column.indicator_column(\n", " categorical_column=gestation_buckets)\n", "\n", " # Cross all the wide columns, have to do the crossing before we one-hot\n", " crossed = tf.feature_column.crossed_column(\n", " keys=[age_buckets, gestation_buckets],\n", " hash_bucket_size=1000)\n", " deep_fc[\"crossed_embeds\"] = tf.feature_column.embedding_column(\n", " categorical_column=crossed, dimension=nembeds)\n", "\n", " return wide_fc, deep_fc\n", "\n", "\n", "def get_model_outputs(wide_inputs, deep_inputs, dnn_hidden_units):\n", " \"\"\"Creates model architecture and returns outputs.\n", "\n", " Args:\n", " wide_inputs: Dense tensor used as inputs to wide side of model.\n", " deep_inputs: Dense tensor used as inputs to deep side of model.\n", " dnn_hidden_units: List of integers where length is number of hidden\n", " layers and ith element is the number of neurons at ith layer.\n", " Returns:\n", " Dense tensor output from the model.\n", " \"\"\"\n", " # Hidden layers for the deep side\n", " layers = [int(x) for x in dnn_hidden_units]\n", " deep = deep_inputs\n", " for layerno, numnodes in enumerate(layers):\n", " deep = tf.keras.layers.Dense(\n", " units=numnodes,\n", " activation=\"relu\",\n", " name=\"dnn_{}\".format(layerno+1))(deep)\n", " deep_out = deep\n", "\n", " # Linear model for the wide side\n", " wide_out = tf.keras.layers.Dense(\n", " units=10, activation=\"relu\", name=\"linear\")(wide_inputs)\n", "\n", " # Concatenate the two sides\n", " both = tf.keras.layers.concatenate(\n", " inputs=[deep_out, wide_out], name=\"both\")\n", "\n", " # Final output is a linear activation because this is regression\n", " output = tf.keras.layers.Dense(\n", " units=1, activation=\"linear\", name=\"weight\")(both)\n", "\n", " return output\n", "\n", "\n", "def rmse(y_true, y_pred):\n", " \"\"\"Calculates RMSE evaluation metric.\n", "\n", " Args:\n", " y_true: tensor, true labels.\n", " y_pred: tensor, predicted labels.\n", " Returns:\n", " Tensor with value of RMSE between true and predicted labels.\n", " \"\"\"\n", " return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true)))\n", "\n", "\n", "def build_wide_deep_model(dnn_hidden_units=[64, 32], nembeds=3):\n", " \"\"\"Builds wide and deep model using Keras Functional API.\n", "\n", " Returns:\n", " `tf.keras.models.Model` object.\n", " \"\"\"\n", " # Create input layers\n", " inputs = create_input_layers()\n", "\n", " # Create feature columns for both wide and deep\n", " wide_fc, deep_fc = create_feature_columns(nembeds)\n", "\n", " # The constructor for DenseFeatures takes a list of numeric columns\n", " # The Functional API in Keras requires: LayerConstructor()(inputs)\n", " wide_inputs = tf.keras.layers.DenseFeatures(\n", " feature_columns=wide_fc.values(), name=\"wide_inputs\")(inputs)\n", " deep_inputs = tf.keras.layers.DenseFeatures(\n", " feature_columns=deep_fc.values(), name=\"deep_inputs\")(inputs)\n", "\n", " # Get output of model given inputs\n", " output = get_model_outputs(wide_inputs, deep_inputs, dnn_hidden_units)\n", "\n", " # Build model and compile it all together\n", " model = tf.keras.models.Model(inputs=inputs, outputs=output)\n", " model.compile(optimizer=\"adam\", loss=\"mse\", metrics=[rmse, \"mse\"])\n", "\n", " return model\n", "\n", "\n", "def train_and_evaluate(args):\n", " model = build_wide_deep_model(args[\"nnsize\"], args[\"nembeds\"])\n", " print(\"Here is our Wide-and-Deep architecture so far:\\n\")\n", " print(model.summary())\n", "\n", " trainds = load_dataset(\n", " args[\"train_data_path\"],\n", " args[\"batch_size\"],\n", " 'train')\n", "\n", " evalds = load_dataset(\n", " args[\"eval_data_path\"], 1000, 'eval')\n", " if args[\"eval_steps\"]:\n", " evalds = evalds.take(count=args[\"eval_steps\"])\n", "\n", " num_batches = args[\"batch_size\"] * args[\"num_epochs\"]\n", " steps_per_epoch = args[\"train_examples\"] // num_batches\n", "\n", " checkpoint_path = os.path.join(args[\"output_dir\"], \"checkpoints/babyweight\")\n", " cp_callback = tf.keras.callbacks.ModelCheckpoint(\n", " filepath=checkpoint_path, verbose=1, save_weights_only=True)\n", "\n", " history = model.fit(\n", " trainds,\n", " validation_data=evalds,\n", " epochs=args[\"num_epochs\"],\n", " steps_per_epoch=steps_per_epoch,\n", " verbose=2, # 0=silent, 1=progress bar, 2=one line per epoch\n", " callbacks=[cp_callback])\n", "\n", " EXPORT_PATH = os.path.join(\n", " args[\"output_dir\"], datetime.datetime.now().strftime(\"%Y%m%d%H%M%S\"))\n", " tf.saved_model.save(\n", " obj=model, export_dir=EXPORT_PATH) # with default serving function\n", " \n", " hp_metric = history.history['val_rmse'][-1]\n", "\n", " hpt = hypertune.HyperTune()\n", " hpt.report_hyperparameter_tuning_metric(\n", " hyperparameter_metric_tag='rmse',\n", " metric_value=hp_metric,\n", " global_step=args['num_epochs'])\n", " \n", " print(\"Exported trained model to {}\".format(EXPORT_PATH))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Training on Cloud AI Platform\n", "\n", "Now that we see everything is working locally, it's time to train on the cloud! " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To submit to the Cloud we use [`gcloud ai-platform jobs submit training [jobname]`](https://cloud.google.com/sdk/gcloud/reference/ml-engine/jobs/submit/training) and simply specify some additional parameters for AI Platform Training Service:\n", "- jobname: A unique identifier for the Cloud job. We usually append system time to ensure uniqueness\n", "- job-dir: A GCS location to upload the Python package to\n", "- runtime-version: Version of TF to use.\n", "- python-version: Version of Python to use. Currently only Python 3.7 is supported for TF 2.1.\n", "- region: Cloud region to train in. See [here](https://cloud.google.com/ml-engine/docs/tensorflow/regions) for supported AI Platform Training Service regions\n", "\n", "Below the `-- \\` we add in the arguments for our `task.py` file." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "\n", "OUTDIR=gs://${BUCKET}/babyweight/trained_model\n", "JOBID=babyweight_$(date -u +%y%m%d_%H%M%S)\n", "\n", "gcloud ai-platform jobs submit training ${JOBID} \\\n", " --region=${REGION} \\\n", " --module-name=trainer.task \\\n", " --package-path=$(pwd)/babyweight/trainer \\\n", " --job-dir=${OUTDIR} \\\n", " --staging-bucket=gs://${BUCKET} \\\n", " --master-machine-type=n1-standard-8 \\\n", " --scale-tier=CUSTOM \\\n", " --runtime-version=${TFVERSION} \\\n", " --python-version=${PYTHONVERSION} \\\n", " -- \\\n", " --train_data_path=gs://${BUCKET}/babyweight/data/train*.csv \\\n", " --eval_data_path=gs://${BUCKET}/babyweight/data/eval*.csv \\\n", " --output_dir=${OUTDIR} \\\n", " --num_epochs=10 \\\n", " --train_examples=10000 \\\n", " --eval_steps=100 \\\n", " --batch_size=32 \\\n", " --nembeds=8" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The training job should complete within 15 to 20 minutes. You do not need to wait for this training job to finish before moving forward in the notebook, but will need a trained model." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Check our trained model files\n", "\n", "Let's check the directory structure of our outputs of our trained model in folder we exported. We'll want to deploy the saved_model.pb within the timestamped directory as well as the variable values in the variables folder. Therefore, we need the path of the timestamped directory so that everything within it can be found by Cloud AI Platform's model deployment service." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "%%bash\n", "gsutil ls gs://${BUCKET}/babyweight/trained_model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Copyright 2020 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.5.3" } }, "nbformat": 4, "nbformat_minor": 4 }