tools/agile-machine-learning-api/codes/trainer/input_pipeline_dask.py (258 lines of code) (raw):
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Input Pipeline for loading data into the model
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from multiprocessing import cpu_count
import dask
import dask.dataframe as dd
import dill
import lime.lime_tabular
import numpy as np
import pandas as pd
import tensorflow as tf
from google.cloud import storage
from six.moves.urllib.parse import urlparse
class InputReader(object):
"""Class for reading input from different sources
Assuming csv files for now
"""
def __init__(self, csv_path, task_type, target_var, na_values=None,
column_names=None,
to_drop=None, gcs_path=False, data_type=None):
"""The init method initialise and keeps track of the source of input
(say csv, json etc) and other variables.
Arguments:
csv_path : string, Path of the csv files whether local or on remote storage
task_type : string, ML task at hand, following options are expected
[classification, regression, clustering]
target_var : string, Name of the dependent/target variable
na_values : string, String by which the na values are represented in the data
column_names : string, Names of the columns passed in a text file
to_drop : list, Any redundant columns which can be dropped
gcs_path : boolean, Whether the csv is stored on google cloud storage
data_type : dict, dictionary containing the data type of all columns in format
{'a': 'float', 'b': 'object', 'c': 'int' }
"""
self.csv_path = csv_path
self.task_type = task_type
self.target_var = target_var
self.na_values = na_values
self.to_drop = to_drop
self.gcs_path = gcs_path
self.data_type = data_type
if column_names:
with tf.gfile.Open(column_names, 'r') as f:
self.column_names = [line.rstrip() for line in f]
self.column_names = [
line for line in self.column_names if line]
else:
self.column_names = column_names
def parse_csv_wrap(self):
"""
A Wrapper function for parsing csv files
Returns:
_parse_csv function
"""
return self._parse_csv()
def _parse_csv(self):
"""Reads csv files in dask to determine the datatypes and other features about data
this helps in creating a dataset object in tensorflow
Returns:
df : dask dataframe, parsed dataframe object
list(df.columns) : list, list of column names
"""
if self.gcs_path:
if isinstance(self.csv_path, list):
for index, path in enumerate(self.csv_path):
parse_result = urlparse(path)
bucket = parse_result.hostname
csv_name = parse_result.path
self._download_csv(
bucket,
csv_name,
path_name='/tmp/data_' +
str(index) +
'.csv')
csv_path = '/tmp/data_*.csv'
else:
parse_result = urlparse(self.csv_path)
bucket = parse_result.hostname
csv_name = parse_result.path
self._download_csv(bucket, csv_name)
csv_path = '/tmp/data.csv'
else:
csv_path = self.csv_path
if self.column_names:
header = None
else:
header = 'infer'
try:
df = dd.read_csv(
csv_path,
names=self.column_names,
header=header,
na_values=self.na_values,
sample=12800000,
dtype=self.data_type)
if isinstance(csv_path, list):
len(df) # Checks whether schema is consistent throughout the data
except Exception:
raise AssertionError(
'Data types given are inconsistent with data provided')
if self.to_drop is not None:
drop_column_names = self.to_drop
drop_column_names = [
name for name in drop_column_names if name in df.columns]
df = self.drop_cols(df, drop_column_names)
tf.logging.info('Dropping the columns : %s', drop_column_names)
return df, list(df.columns)
@classmethod
def drop_cols(cls, df, col_names):
"""Drops any columns which are not required by the user.
Arguments:
df : dask dataframe, Dataframe of input data
col_names : list, Columns in the data to be dropped
returns:
dask dataframe, Updated dataframe with columns dropped
"""
return df.drop(col_names, axis=1)
@classmethod
def _download_csv(cls, bucket_name, csv_path, path_name='/tmp/data.csv'):
"""Utility to download the csv files which is stored on Google Cloud Storage
to local files system. Once processed the file will be deleted
Arguments:
bucket_name : string, Remote location of csv file
csv_name : string, Name of the csv file on GCS
"""
client = storage.Client()
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(csv_path)
blob.download_to_filename(path_name)
class BasicStats(object):
"""Calculating stats and using them for cleaning the data"""
def __init__(self):
"""Data type parameters"""
def is_not_used(self):
pass
@classmethod
def dropping_zero_var_cols(cls, df, target_var, stddev_list):
"""Check columns which have zero variance and removes the from the dataframe.
As the zero variance columns or contant columns can't be considered as output column
Arguments:
df : dask dataframe, The dataframe to validate
stddev : dask series, Series containing the standard deviation values for columns
target_var : string, Dependent variable for the analysis
Returns:
df : dask dataframe, Dataframe with redundant columns removed
Raises:
AssertionError : If the target column has zero deviation
"""
continuous_cols = [
col for col in df.columns if df[col].dtype != 'object']
for col in continuous_cols:
if stddev_list[col] == 0.0:
df = df.drop(col, axis=1)
if col == target_var:
err_msg = 'Target variable has zero standard deviation or a contant column. ' \
'Please check the data'
tf.logging.error(err_msg)
raise AssertionError(err_msg)
return df
@classmethod
def normalize(cls, df, target_var, mean_list, stddev_list):
"""Normalizes the numerical columns in a dataframe.
Arguments:
df : dask dataframe, The dataframe to normalize
target_var : string, Dependent variable for the analysis
mean_list : dask series, Series with all the mean values
stddev_list : dask series, Series with all the standard deviation values
Returns:
df : Dataframe with mean normalized numerical columns
"""
continuous_cols = [
col for col in df.columns if df[col].dtype != 'object' and col != target_var]
for col in continuous_cols:
df[col] = df[col].sub(mean_list[col]).div(stddev_list[col])
return df
@classmethod
def calculate_stats(cls, df, target_var):
"""Calculates descriptive stats of the dataframe required for cleaning.
Arguments:
df : dask dataframe, The dataframe at hand
target_var : string, Dependent variable for the analysis
Returns:
mean : dask series, mean of each column
median : dask series, median of each column
dict(zip(categorical_cols, mode)) : dict, Dictionary containing
categorical column as keys and their modes as values
std : dask series, standard deviation of each column
"""
categorical_columns = [
col for col in df.columns if col != target_var and df[col].dtype == 'object']
mean_op = df.mean()
std_op = df.std()
median_op = df.quantile(0.5)
mode_op = [df[col].value_counts().idxmax()
for col in categorical_columns]
mean, median, mode, std = dask.compute(
mean_op, median_op, mode_op, std_op)
return mean, median, dict(zip(categorical_columns, mode)), std
@classmethod
def impute(cls, df, target_var, median, mode):
"""Imputing missing values using median for continuous columns and mode
for categorical columns.
Arguments:
df : dask dataframe, The dataframe at hand
target_var : string, Dependent variable for the analysis
median : list, median of all columns in data
mode : list, mode of all columns in data
Returns:
df : dask dataframe, Dataframe without missing values
"""
missing_stats = df.isna().sum().compute()
cols = [col for col in df.columns if col != target_var]
for col in cols:
if missing_stats[col] > 0 and df[col].dtype == 'object':
df[col] = df[col].fillna(mode[col])
elif missing_stats[col] > 0:
df[col] = df[col].fillna(median[col])
return df
def clean_data(self, df, target_var, task_type, name):
"""Cleans a dataset by removing outliers
Outiers and missing values are replaced by
median for continuous and mode for categorical
Arguments:
df : dask dataframe, The dataframe to be cleaned
target_var : string, Name of the target variable
task_type : string, Type of the task at hand
name : string, Name of the data being cleaned (train or eval)
Returns:
df : dask dataframe, Cleaned dataframe
mean : dask series, mean of each column
std_dev : dask series, standard deviation of each column
_csv_defaults : list, list of default value of each column
"""
mean, median, mode, std_dev = self.calculate_stats(df, target_var)
df = self.dropping_zero_var_cols(df, target_var, std_dev)
df = self.impute(df, target_var, median, mode)
if task_type == 'classification':
if df[target_var].dtype == 'float64':
df[target_var] = df[target_var].astype(np.int64)
dtype_map = {'float64': 0., 'int64': 0, 'object': ''}
dtype_list = [str(dtype) for dtype in df.dtypes]
_csv_defaults = [[dtype_map[dtype]] for dtype in dtype_list]
if name == 'train' and task_type == 'classification':
self.creating_explainer_lime(df, target_var)
df.to_csv('/tmp/clean_*_' + str(name) + '.csv', index=False)
return df, mean, std_dev, _csv_defaults
def find_vocab(self, df):
"""Finds the number of levels in each categorical column.
Helps for creation of feature columns for use in tf.data API
Arguments:
df : dask dataframe, Dataframe to extract the levels from
Returns:
A dictionary of column names and the levels in each variables
[ 0 for numerical columns and number of levels for categorical columns]
"""
self.is_not_used()
cat_columns = [
col for col in df.columns if df[col].dtype == 'object']
continuous_cols = [
col for col in df.columns if df[col].dtype != 'object']
temp = dask.compute([df[col].drop_duplicates() for col in cat_columns])
column_mapping = dict()
for col in continuous_cols:
column_mapping[col] = 0
for index, col in enumerate(cat_columns):
column_mapping[col] = np.array(temp[0][index])
return column_mapping
def creating_explainer_lime(self, df, target_var):
"""Creates a LIME explainer and saves it as a pickle object
Arguments:
df : dask dataframe, Dataframe for which explainer is to be created
target_var : string, Output column of the dataframe
"""
self.is_not_used()
pandas_df = df.compute()
class_names = list(pandas_df[target_var].unique())
pandas_df = pandas_df.drop(target_var, axis=1)
dict_mapping = dict()
categorical_columns = [
col for col in pandas_df.columns if pandas_df[col].dtype == 'object']
categorical_columns_index = [index for index in range(0, len(
pandas_df.columns)) if pandas_df[pandas_df.columns[index]].dtype == 'object']
for col in categorical_columns:
pandas_df[col] = pd.Categorical(
pandas_df[col], categories=pandas_df[col].unique())
dict_mapping[col] = dict(enumerate(pandas_df[col].cat.categories))
pandas_df[col] = pandas_df[col].cat.codes
feature_names = list(pandas_df.columns)
dict_of_feature_names = dict()
for col_index in categorical_columns_index:
dict_of_feature_names[col_index] = dict_mapping[feature_names[col_index]].values(
)
explainer = lime.lime_tabular.LimeTabularExplainer(
np.array(pandas_df),
feature_names=feature_names,
class_names=class_names,
categorical_features=categorical_columns_index,
categorical_names=dict_of_feature_names,
verbose=True)
with open('/tmp/lime_explainer', 'wb') as dill_file:
dill.dump(explainer, dill_file)
dill.dump(dict_mapping, dill_file)
dill.dump(feature_names, dill_file)
class DatasetInput(object):
"""
Class for building a tf.data object and input function for tf.Estimator
"""
def __init__(self, num_epochs, batch_size, buffer_size,
csv_defaults, csv_cols, target_var, task_type,
condition=None):
"""Initializes the dataset object for a csv reader
num_epochs : integer, number of epochs to run
batch_size : integer, batch size of the data
buffer_size : integer, buffer size
csv_defaults : dict, default value for each column
csv_cols : list, list of column names of the data
target_var : string, name of the target variable
feat_cols : list, tf.featurecolumn objects to define features
task_type : string, ML task at hand, following options are expected
[classification, regression, clustering]
condition : string, condition of target variable
"""
self.num_epochs = num_epochs
self.batch_size = batch_size
self.buffer_size = buffer_size
self.csv_defaults = csv_defaults
self.csv_cols = csv_cols
self.target_var = target_var
self.feat_cols = []
self.task_type = task_type
self.condition = condition
def parse_csv(self, line):
"""Decodes an item from the textline dataset and parses them into columns
Arguments:
line : string, The items returned by the dataset object
Returns:
features : textline dataset, Data with all the features column except label column
label : textline dataset, Data of label column
"""
parsed_line = tf.decode_csv(line, record_defaults=self.csv_defaults)
tf.logging.info(
'The Default datatypes read are : %s',
self.csv_defaults)
features = dict(zip(self.csv_cols, parsed_line))
label = features.pop(self.target_var)
if self.condition:
label = tf.equal(label, self.condition)
return features, label
@staticmethod
def _get_pattern(name, csv_path=None):
"""
Helper function for returnning the naming pattern of the cleaned data
Arguments:
name : string, type of the data ['Train' or 'Eval']
csv_path : string, path of the cleaned csv
Returns :
pattern : string, globpath of the cleaned data
"""
pattern = '/tmp/clean_*_{}*'.format(name)
if csv_path is not None:
pattern = csv_path
return pattern
def input_fn(self, name, csv_path=None):
"""Creates a dataset object for the model to consume. Input function for estimator
Arguments:
name : string, Name of the data [Train or Eval]
csv_path : The path of the csv on any storage system
Returns:
features : tf.data.TextLineDataset object, Dataset containing batch of features
labels : tf.data.TextLineDataset object, Dataset containing batch of labels
"""
pattern = self._get_pattern(name, csv_path)
tf.logging.info('The Pattern of files is : %s', pattern)
filenames = tf.matching_files(pattern=pattern)
dataset = tf.data.TextLineDataset(filenames).skip(1).map(
self.parse_csv, num_parallel_calls=cpu_count())
dataset = dataset.shuffle(buffer_size=self.batch_size * 100)
dataset = dataset.apply(tf.contrib.data.ignore_errors())
dataset = dataset.repeat(self.num_epochs)
dataset = dataset.batch(self.batch_size) # determine the ideal number
dataset = dataset.prefetch(self.buffer_size)
iterator = dataset.make_one_shot_iterator()
feats, labs = iterator.get_next()
return feats, labs
def kmeans_input_fn(self, name, csv_path=None):
"""Input function for kmeans
Arguments:
name : string, Name of the data [Train or Eval]
csv_path : The path of the csv on any storage system
Returns:
A batch of features
"""
pattern = self._get_pattern(name, csv_path)
tf.logging.info('The Pattern of files is : %s', pattern)
df = dd.read_csv(pattern)
vectors = dask.compute(df.values)
return tf.train.limit_epochs(
tf.convert_to_tensor(vectors[0], dtype=tf.float32), num_epochs=1)
def create_feature_columns_wrap(self, dictionary, mean, std_dev):
"""
Wrapper function for returning create_feature_columns function
Arguments:
dictionary : dict, Dictionary with variable names and levels
mean : dask series, mean of the data
std_dev : dask_series, standard deviation of the data
Returns:
_create_feature_columns function
"""
return self._create_feature_columns(dictionary, mean, std_dev)
def _create_feature_columns(self, dictionary, mean, std_dev):
"""Creates an instance of tf.feature columns for each column
in the feature set. Required for canned estimators
Arguments:
dictionary : dict, Dictionary with variable names and levels
mean : dask series, mean of the data
std_dev : dask_series, standard deviation of the data
Returns:
A list of feature column objects based on the dictionary
"""
tmp_mean = 0.0
tmp_std_dev = 0.0
for col, vocab in dictionary.items():
if isinstance(vocab, int):
tmp_mean = mean[col]
tmp_std_dev = std_dev[col]
feat_col = tf.feature_column.numeric_column(
col, normalizer_fn=lambda x: (x - tmp_mean) / tmp_std_dev)
else:
feat_col = tf.feature_column.categorical_column_with_vocabulary_list(
col, vocab, num_oov_buckets=1)
self.feat_cols.append(feat_col)
return self.feat_cols