courses/machine_learning/feateng/taxifare/trainer/model.py (144 lines of code) (raw):

#!/usr/bin/env python # Copyright 2017 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import absolute_import from __future__ import division from __future__ import print_function import tensorflow as tf import numpy as np tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO) CSV_COLUMNS = 'fare_amount,dayofweek,hourofday,pickuplon,pickuplat,dropofflon,dropofflat,passengers,key'.split(',') LABEL_COLUMN = 'fare_amount' KEY_FEATURE_COLUMN = 'key' DEFAULTS = [[0.0], ['Sun'], [0], [-74.0], [40.0], [-74.0], [40.7], [1.0], ['nokey']] # These are the raw input columns, and will be provided for prediction also INPUT_COLUMNS = [ # Define features tf.feature_column.categorical_column_with_vocabulary_list('dayofweek', vocabulary_list = ['Sun', 'Mon', 'Tues', 'Wed', 'Thu', 'Fri', 'Sat']), tf.feature_column.categorical_column_with_identity('hourofday', num_buckets = 24), # Numeric columns tf.feature_column.numeric_column('pickuplat'), tf.feature_column.numeric_column('pickuplon'), tf.feature_column.numeric_column('dropofflat'), tf.feature_column.numeric_column('dropofflon'), tf.feature_column.numeric_column('passengers'), # Engineered features that are created in the input_fn tf.feature_column.numeric_column('latdiff'), tf.feature_column.numeric_column('londiff'), tf.feature_column.numeric_column('euclidean') ] # Build the estimator def build_estimator(model_dir, nbuckets, hidden_units): """ Build an estimator starting from INPUT COLUMNS. These include feature transformations and synthetic features. The model is a wide-and-deep model. """ # Input columns (dayofweek, hourofday, plat, plon, dlat, dlon, pcount, latdiff, londiff, euclidean) = INPUT_COLUMNS # Bucketize the lats & lons latbuckets = np.linspace(38.0, 42.0, nbuckets).tolist() lonbuckets = np.linspace(-76.0, -72.0, nbuckets).tolist() b_plat = tf.feature_column.bucketized_column(plat, latbuckets) b_dlat = tf.feature_column.bucketized_column(dlat, latbuckets) b_plon = tf.feature_column.bucketized_column(plon, lonbuckets) b_dlon = tf.feature_column.bucketized_column(dlon, lonbuckets) # Feature cross ploc = tf.feature_column.crossed_column([b_plat, b_plon], nbuckets * nbuckets) dloc = tf.feature_column.crossed_column([b_dlat, b_dlon], nbuckets * nbuckets) pd_pair = tf.feature_column.crossed_column([ploc, dloc], nbuckets ** 4 ) day_hr = tf.feature_column.crossed_column([dayofweek, hourofday], 24 * 7) # Wide columns and deep columns. wide_columns = [ # Feature crosses dloc, ploc, pd_pair, day_hr, # Sparse columns dayofweek, hourofday, # Anything with a linear relationship pcount ] deep_columns = [ # Embedding_column to "group" together ... tf.feature_column.embedding_column(pd_pair, 10), tf.feature_column.embedding_column(day_hr, 10), # Numeric columns plat, plon, dlat, dlon, latdiff, londiff, euclidean ] ## setting the checkpoint interval to be much lower for this task run_config = tf.estimator.RunConfig(save_checkpoints_secs = 30, keep_checkpoint_max = 3) estimator = tf.estimator.DNNLinearCombinedRegressor( model_dir = model_dir, linear_feature_columns = wide_columns, dnn_feature_columns = deep_columns, dnn_hidden_units = hidden_units, config = run_config) # add extra evaluation metric for hyperparameter tuning estimator = tf.compat.v1.estimator.add_metrics(estimator, add_eval_metrics) return estimator # Create feature engineering function that will be used in the input and serving input functions def add_engineered(features): # this is how you can do feature engineering in TensorFlow lat1 = features['pickuplat'] lat2 = features['dropofflat'] lon1 = features['pickuplon'] lon2 = features['dropofflon'] latdiff = (lat1 - lat2) londiff = (lon1 - lon2) # set features for distance with sign that indicates direction features['latdiff'] = latdiff features['londiff'] = londiff dist = tf.sqrt(latdiff * latdiff + londiff * londiff) features['euclidean'] = dist return features # Create serving input function to be able to serve predictions def serving_input_fn(): feature_placeholders = { # All the real-valued columns column.name: tf.compat.v1.placeholder(tf.float32, [None]) for column in INPUT_COLUMNS[2:7] } feature_placeholders['dayofweek'] = tf.compat.v1.placeholder(tf.string, [None]) feature_placeholders['hourofday'] = tf.compat.v1.placeholder(tf.int32, [None]) features = add_engineered(feature_placeholders.copy()) return tf.estimator.export.ServingInputReceiver(features, feature_placeholders) # Create input function to load data into datasets def read_dataset(filename, mode, batch_size = 512): def _input_fn(): def decode_csv(value_column): columns = tf.compat.v1.decode_csv(value_column, record_defaults = DEFAULTS) features = dict(zip(CSV_COLUMNS, columns)) label = features.pop(LABEL_COLUMN) return add_engineered(features), label # Create list of files that match pattern file_list = tf.compat.v1.gfile.Glob(filename) # Create dataset from file list dataset = tf.compat.v1.data.TextLineDataset(file_list).map(decode_csv) if mode == tf.estimator.ModeKeys.TRAIN: num_epochs = None # indefinitely dataset = dataset.shuffle(buffer_size = 10 * batch_size) else: num_epochs = 1 # end-of-input after this dataset = dataset.repeat(num_epochs).batch(batch_size) batch_features, batch_labels = dataset.make_one_shot_iterator().get_next() return batch_features, batch_labels return _input_fn # Create estimator train and evaluate function def train_and_evaluate(args): tf.compat.v1.summary.FileWriterCache.clear() # ensure filewriter cache is clear for TensorBoard events file estimator = build_estimator(args['output_dir'], args['nbuckets'], args['hidden_units'].split(' ')) train_spec = tf.estimator.TrainSpec( input_fn = read_dataset( filename = args['train_data_paths'], mode = tf.estimator.ModeKeys.TRAIN, batch_size = args['train_batch_size']), max_steps = args['train_steps']) exporter = tf.estimator.LatestExporter('exporter', serving_input_fn) eval_spec = tf.estimator.EvalSpec( input_fn = read_dataset( filename = args['eval_data_paths'], mode = tf.estimator.ModeKeys.EVAL, batch_size = args['eval_batch_size']), steps = 100, exporters = exporter) tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec) # If we want to use TFRecords instead of CSV def gzip_reader_fn(): return tf.TFRecordReader(options=tf.python_io.TFRecordOptions( compression_type = tf.python_io.TFRecordCompressionType.GZIP)) def generate_tfrecord_input_fn(data_paths, num_epochs = None, batch_size = 512, mode = tf.estimator.ModeKeys.TRAIN): def get_input_features(): # Read the tfrecords. Same input schema as in preprocess input_schema = {} if mode != tf.estimator.ModeKeys.INFER: input_schema[LABEL_COLUMN] = tf.FixedLenFeature(shape = [1], dtype = tf.float32, default_value = 0.0) for name in ['dayofweek', 'key']: input_schema[name] = tf.FixedLenFeature(shape = [1], dtype = tf.string, default_value = 'null') for name in ['hourofday']: input_schema[name] = tf.FixedLenFeature(shape = [1], dtype = tf.int64, default_value = 0) for name in SCALE_COLUMNS: input_schema[name] = tf.FixedLenFeature(shape = [1], dtype = tf.float32, default_value = 0.0) # How? keys, features = tf.contrib.learn.io.read_keyed_batch_features( data_paths[0] if len(data_paths) == 1 else data_paths, batch_size, input_schema, reader = gzip_reader_fn, reader_num_threads = 4, queue_capacity = batch_size * 2, randomize_input = (mode != tf.estimator.ModeKeys.EVAL), num_epochs = (1 if mode == tf.estimator.ModeKeys.EVAL else num_epochs)) target = features.pop(LABEL_COLUMN) features[KEY_FEATURE_COLUMN] = keys return add_engineered(features), target # Return a function to input the features into the model from a data path. return get_input_features def add_eval_metrics(labels, predictions): pred_values = predictions['predictions'] return { 'rmse': tf.compat.v1.metrics.root_mean_squared_error(labels, pred_values) }