In [None]:
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

### Overview

In this notebook, you'll learn how to submit a job to the Vertex AI training service. In the job you'll train your TensorFlow 2 model and export the saved model to Cloud Storage.

### Dataset

[CTA - Ridership - Daily Boarding Totals](https://data.cityofchicago.org/Transportation/CTA-Ridership-Daily-Boarding-Totals/6iiy-9s97): This dataset shows systemwide boardings for both bus and rail services provided by Chicago Transit Authority, dating back to 2001.

### Objective

The goal is to forecast future transit ridership in the City of Chicago, based on previous ridership.

## Install packages and dependencies

### Import libraries and define constants

In [None]:
import datetime
import os
import time

import numpy as np
import pandas as pd
import tensorflow as tf

from google.cloud import aiplatform, storage
from google.cloud.aiplatform import gapic as aip
from sklearn.preprocessing import StandardScaler

In [None]:
# Check the TensorFlow version installed

tf.__version__

In [None]:
# Enter your project, region, and a bucket name. Then run the  cell to make sure the
# Cloud SDK uses the right project for all the commands in this notebook.

PROJECT = 'your-project-name' # REPLACE WITH YOUR PROJECT ID
BUCKET = 'your-regional-bucket' # REPLACE WITH A UNIQUE REGIONAL BUCKET NAME e.g. your PROJECT NAME
REGION = 'us-central1' # REPLACE WITH YOUR BUCKET REGION e.g. us-central1
BUCKET_URI = 'gs://' + BUCKET

#Don't change the following command - this is to check if you have changed the project name above.
assert PROJECT != 'your-project-name', 'Don''t forget to change the project variables!'

In [None]:
# Initialize the Vertex SDK for Python

aiplatform.init(project=PROJECT, location=REGION, staging_bucket=BUCKET)

In [None]:
# Dataset parameters

target_col = 'total_rides' # The variable you are predicting
ts_col = 'service_date' # The name of the column with the date field

In [None]:
# Model parameters

freq = 'D' # Daily frequency
n_input_steps = 30 # Lookback window
n_output_steps = 7 # How many steps to predict forward
n_seasons = 7 # Monthly periodicity

train_split = 0.8 # % Split between train/test data
epochs = 1000 # How many passes through the data (early-stopping will cause training to stop before this)
patience = 5 # Terminate training after the validation loss does not decrease after this many epochs

lstm_units = 64
input_layer_name = 'lstm_input'

In [None]:
# Training parameters

MODEL_NAME = 'cta_ridership'

### Create a Cloud Storage bucket

**The following steps are required, regardless of your notebook environment.**

When you submit a training job using the Cloud SDK, you upload a Python package
containing your training code to a Cloud Storage bucket. AI Platform runs
the code from this package. In this tutorial, AI Platform also saves the
trained model that results from your job in the same bucket. You can then
create an AI Platform model version based on this output in order to serve
online predictions.

In [None]:
storage_client = storage.Client()
try:
    bucket = storage_client.get_bucket(BUCKET)
    print('Bucket exists, let''s not recreate it.')
except:
    bucket = storage_client.create_bucket(BUCKET)
    print('Created bucket: ' + BUCKET)

## Load and preview the data

Pre-processing on the original dataset has been done for you and made available on Cloud Storage.

In [None]:
processed_file = 'cta_ridership.csv' # Which file to save the results to

if os.path.exists(processed_file):
    input_file = processed_file # File created in previous lab
else:
    input_file = f'data/{processed_file}'

In [None]:
df = pd.read_csv(input_file, index_col=ts_col, parse_dates=True)

# Plot 30 days of ridership 
_ = df[target_col][:30].plot()

In [None]:
# Define some characteristics of the data that will be used later
n_features = len(df.columns)

# Index of target column. Used later when creating dataframes.
target_col_num = df.columns.get_loc(target_col)

### Process data

In [None]:
# Split data

size = int(len(df) * train_split)
df_train, df_test = df[0:size].copy(deep=True), df[size:len(df)].copy(deep=True)

df_train.head()

In [None]:
_ = df_train.plot()

### Scale values

In [None]:
# Review original values

df_train.head()

In [None]:
# For neural networks to converge quicker, it is helpful to scale the values.
# For example, each feature might be transformed to have a mean of 0 and std. dev. of 1.
#
# You are working with a mix of features, input timesteps, output horizon, etc.
# which don't work out-of-the-box with common scaling utilities.
# So, here are a couple wrappers to handle scaling and inverting the scaling.

feature_scaler = StandardScaler()
target_scaler = StandardScaler()

def scale(df, 
          fit=True, 
          target_col=target_col,
          feature_scaler=feature_scaler,
          target_scaler=target_scaler):
    """
    Scale the input features, using a separate scaler for the target.
    
    Parameters: 
    df (pd.DataFrame): Input dataframe
    fit (bool): Whether to fit the scaler to the data (only apply to training data)
    target_col (pd.Series): The column that is being predicted
    feature_scaler (StandardScaler): Scaler used for features
    target_scaler (StandardScaler): Scaler used for target
      
    Returns: 
    df_scaled (pd.DataFrame): Scaled dataframe   
    """    
    
    target = df[target_col].values.reshape(-1, 1)
    if fit:
        target_scaler.fit(target)
    target_scaled = target_scaler.transform(target)
    
    # Select all columns other than target to be features
    features = df.loc[:, df.columns != target_col].values
    
    if features.shape[1]:  # If there are any features
        if fit:
            feature_scaler.fit(features)
        features_scaled = feature_scaler.transform(features)
        
        # Combine target and features into one data frame
        df_scaled = pd.DataFrame(features_scaled)
        target_col_num = df.columns.get_loc(target_col)
        df_scaled.insert(target_col_num, target_col, target_scaled)
        df_scaled.columns = df.columns        
    
    else:  # If only target column (no additional features)
        df_scaled = pd.DataFrame(target_scaled, columns=df.columns)
      
    return df_scaled

def inverse_scale(data, target_scaler=target_scaler):
    """
    Transform the scaled values of the target back into their original form.
    The features are left alone, as we're assuming that the output of the model only includes the target.
    
    Parameters: 
    data (np.array): Input array
    target_scaler (StandardScaler): Scaler used for target
      
    Returns: 
    data_scaled (np.array): Scaled array   
    """    
    
    df = pd.DataFrame()
    data_scaled = np.empty([data.shape[1], data.shape[0]])
    for i in range(data.shape[1]):
        data_scaled[i] = target_scaler.inverse_transform([data[:,i]])
    return data_scaled.transpose()

df_train_scaled=scale(df_train)
df_test_scaled=scale(df_test, False)

In [None]:
# Review scaled values

df_train_scaled.head()

### Create sequences of time series data

In [None]:
def reframe(data, n_input_steps = n_input_steps, n_output_steps = n_output_steps, target_col = target_col):

    target_col_num = data.columns.get_loc(target_col)    
    
    # Iterate through data and create sequences of features and outputs
    df = pd.DataFrame(data)
    cols=list()
    for i in range(n_input_steps, 0, -1):
        cols.append(df.shift(i))
    for i in range(0, n_output_steps):
        cols.append(df.shift(-i))
        
    # Concatenate values and remove any missing values
    df = pd.concat(cols, axis=1)
    df.dropna(inplace=True)
    
    # Split the data into feature and target variables
    n_feature_cols = n_input_steps * n_features
    features = df.iloc[:,0:n_feature_cols]
    target_cols = [i for i in range(n_feature_cols + target_col_num, n_feature_cols + n_output_steps * n_features, n_features)]
    targets = df.iloc[:,target_cols]

    return (features, targets)

X_train_reframed, y_train_reframed = reframe(df_train_scaled)
X_test_reframed, y_test_reframed = reframe(df_test_scaled)

## Build a model and submit your training job to AI Platform

The model you're building here trains pretty fast so you could train it in this notebook, but for more computationally expensive models, it's useful to train them in the Cloud. To use AI Platform Training, you'll package up your training code and submit a training job to the AI Platform Prediction service.

In your training script, you'll also export your trained `SavedModel` to a Cloud Storage bucket.

### Prepare test data

In [None]:
# Reshape test data to match model inputs and outputs

X_train = X_train_reframed.values.reshape(-1, n_input_steps, n_features)
X_test = X_test_reframed.values.reshape(-1, n_input_steps, n_features)
y_train = y_train_reframed.values.reshape(-1, n_output_steps)
y_test = y_test_reframed.values.reshape(-1, n_output_steps)

In [None]:
# Specify directories to be used later

TRAINER_DIR = 'trainer'
EXPORT_DIR = 'tf_export'

In [None]:
# Create trainer directory if it doesn't already exist

!mkdir $TRAINER_DIR

In [None]:
# Copy numpy arrays to npy files

np.save(TRAINER_DIR + '/x_train.npy', X_train)
np.save(TRAINER_DIR + '/x_test.npy', X_test)
np.save(TRAINER_DIR + '/y_train.npy', y_train)
np.save(TRAINER_DIR + '/y_test.npy', y_test)

### Prepare model code

In [None]:
# Write training code out to a file that will be submitted to the training job
# Note: f-strings are supported in Python 3.6 and above

model_template = f"""import argparse
import numpy as np
import os
import tempfile

from google.cloud import storage
from tensorflow import keras
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Dense, LSTM
from tensorflow.keras.callbacks import EarlyStopping

n_features = {n_features} # Two features: y (previous values) and whether the date is a holiday
n_input_steps = {n_input_steps} # Lookback window
n_output_steps = {n_output_steps} # How many steps to predict forward

epochs = {epochs} # How many passes through the data (early-stopping will cause training to stop before this)
patience = {patience} # Terminate training after the validation loss does not decrease after this many epochs

def download_blob(bucket_name, source_blob_name, destination_file_name):
    '''Downloads a blob from the bucket.'''
    # bucket_name = "your-bucket-name"
    # source_blob_name = "storage-object-name"
    # destination_file_name = "local/path/to/file"

    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)

    # Construct a client side representation of a blob.
    # Note `Bucket.blob` differs from `Bucket.get_blob` as it doesn't retrieve
    # any content from Google Cloud Storage. As we don't need additional data,
    # using `Bucket.blob` is preferred here.
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)

    print("Blob " + source_blob_name + " downloaded to " + destination_file_name + ".")

def extract_bucket_and_prefix_from_gcs_path(gcs_path: str):
    '''Given a complete GCS path, return the bucket name and prefix as a tuple.

    Example Usage:

        bucket, prefix = extract_bucket_and_prefix_from_gcs_path(
            "gs://example-bucket/path/to/folder"
        )

        # bucket = "example-bucket"
        # prefix = "path/to/folder"

    Args:
        gcs_path (str):
            Required. A full path to a Google Cloud Storage folder or resource.
            Can optionally include "gs://" prefix or end in a trailing slash "/".

    Returns:
        Tuple[str, Optional[str]]
            A (bucket, prefix) pair from provided GCS path. If a prefix is not
            present, a None will be returned in its place.
    '''
    if gcs_path.startswith("gs://"):
        gcs_path = gcs_path[5:]
    if gcs_path.endswith("/"):
        gcs_path = gcs_path[:-1]

    gcs_parts = gcs_path.split("/", 1)
    gcs_bucket = gcs_parts[0]
    gcs_blob_prefix = None if len(gcs_parts) == 1 else gcs_parts[1]

    return (gcs_bucket, gcs_blob_prefix)

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--data-uri',
        default=None,
        help='URL where the training files are located')
    args = parser.parse_args()
    print(args)
    return args

def main():
    args = get_args()
    bucket_name, blob_prefix = extract_bucket_and_prefix_from_gcs_path(args.data_uri)
    
    # Get the training data and convert back to np arrays
    local_data_dir = os.path.join(os.getcwd(), tempfile.gettempdir())
    files = ['x_train.npy', 'y_train.npy', 'x_test.npy', 'y_test.npy']
 
    for file in files:
        download_blob(bucket_name, os.path.join(blob_prefix,file), os.path.join(local_data_dir,file))

    X_train = np.load(local_data_dir + '/x_train.npy')
    y_train = np.load(local_data_dir + '/y_train.npy')
    X_test = np.load(local_data_dir + '/x_test.npy')
    y_test = np.load(local_data_dir + '/y_test.npy')
        
    # Build and train the model
    model = Sequential([
        LSTM({lstm_units}, input_shape=[n_input_steps, n_features], recurrent_activation=None),
        Dense(n_output_steps)])

    model.compile(optimizer='adam', loss='mae')

    early_stopping = EarlyStopping(monitor='val_loss', patience=patience)
    _ = model.fit(x=X_train, y=y_train, validation_data=(X_test, y_test), epochs=epochs, callbacks=[early_stopping])
    
    # Export the model
    model.save(os.environ["AIP_MODEL_DIR"])
    
if __name__ == '__main__':
    main()
"""

with open(os.path.join(TRAINER_DIR, 'task.py'), 'w') as f:
    f.write(model_template.format(**globals()))

In [None]:
# Copy the data files to a GCS bucket

!gsutil -m cp -r trainer/*.npy $BUCKET_URI/$TRAINER_DIR

In [None]:
# List the contents of the bucket to ensure they were copied properly

!gsutil ls $BUCKET_URI/$TRAINER_DIR

### Submit training job

In [None]:
# Set training job parameters

CMDARGS = [
    f"--data-uri={BUCKET_URI}/{TRAINER_DIR}"
]
TRAIN_VERSION = "tf-cpu.2-6"
DEPLOY_VERSION = "tf2-cpu.2-6"

TRAIN_IMAGE = "us-docker.pkg.dev/vertex-ai/training/{}:latest".format(TRAIN_VERSION)
DEPLOY_IMAGE = "us-docker.pkg.dev/vertex-ai/prediction/{}:latest".format(DEPLOY_VERSION)

In [None]:
# Re-run these additional parameters if you need to create a new training job

TIMESTAMP = str(datetime.datetime.now().time())
JOB_NAME = 'vertex_ai_training_' + TIMESTAMP
MODEL_DISPLAY_NAME = MODEL_NAME + TIMESTAMP

In [None]:
# Create and run the training job

job = aiplatform.CustomTrainingJob(
    display_name=JOB_NAME,
    script_path=f"{TRAINER_DIR}/task.py",
    container_uri=TRAIN_IMAGE,
    model_serving_container_image_uri=DEPLOY_IMAGE,
)

model = job.run(
        model_display_name=MODEL_DISPLAY_NAME,
        args=CMDARGS,
)

## Deploy the model

In [None]:
DEPLOYED_NAME = f"{MODEL_NAME}_deployed-" + TIMESTAMP

endpoint = model.deploy(
    deployed_model_display_name=DEPLOYED_NAME,
    machine_type="n1-standard-4",
    min_replica_count=1,
    max_replica_count=1,
    traffic_split={"0": 100},
)

## Get predictions on deployed model

In [None]:
# Get predictions for the first test instance

raw_predictions = endpoint.predict(instances=X_test.tolist()).predictions[0]
predicted_values = inverse_scale(np.array([raw_predictions])).round()

actual_values = inverse_scale(np.array([y_test[0]]))

In [None]:
# Print prediction and compare to actual value

print('Predicted riders:', predicted_values)
print('Actual riders:   ', actual_values)

## Cleanup

In [None]:
delete_training_job = True
delete_model = True
delete_endpoint = True

# Warning: Setting this to true will delete everything in your bucket
delete_bucket = False

# Delete the training job
job.delete()

# Delete the endpoint
endpoint.delete(force=True)

# Delete the model
model.delete()

# Warning: uncomment this section only if you want to delete the entire bucket
# if delete_bucket and "BUCKET" in globals():
#     ! gsutil -m rm -r $BUCKET

## Conclusion

In this section, you've learned how to:
* Prepare data and models for training in the cloud
* Train your model and monitor the progress of the job with AI Platform Training
* Predict using the model with AI Platform Predictions