in tools/agile-machine-learning-api/codes/trainer/launch_demo.py [0:0]
def run_experiment(hparams):
"""
Arguments:
hparams : tf.contrib.training.HParams object, contains all the arguments
as a set of key value pairs
Sets up the experiment to be launched on cloud machine learning engine
"""
a = time.time()
_, csv_cols, csv_defaults, mapped, mean, std_dev = prep_input(
csv_path=hparams.train_csv_path,
task_type=hparams.task_type,
target_var=hparams.target_var,
na_values=hparams.na_values,
column_names=hparams.column_names,
to_drop=hparams.to_drop,
gcs_path=hparams.gcs_path,
data_type=hparams.data_type,
name='train')
_, _, _, _, _, _ = prep_input(
csv_path=hparams.eval_csv_path,
task_type=hparams.task_type,
target_var=hparams.target_var,
na_values=hparams.na_values,
column_names=hparams.column_names,
to_drop=hparams.to_drop,
gcs_path=hparams.gcs_path,
data_type=hparams.data_type,
name='eval')
data = DatasetInput(
num_epochs=hparams.num_epochs,
batch_size=hparams.batch_size,
buffer_size=hparams.buffer_size,
csv_defaults=csv_defaults,
csv_cols=csv_cols,
target_var=hparams.target_var,
task_type=hparams.task_type,
condition=hparams.condition)
feature_cols = data.create_feature_columns_wrap(
dictionary=mapped,
mean=mean,
std_dev=std_dev)
b = time.time()
tf.logging.info('Parse time is : %s', b - a)
if hparams.name == 'kmeanscluster':
def train_input():
return data.kmeans_input_fn('train')
def eval_input():
return data.kmeans_input_fn('eval')
else:
def train_input():
return data.input_fn('train')
def eval_input():
return data.input_fn('eval')
def json_serving_input_fn():
"""
Build the serving inputs.
Returns: Serving input function for JSON data
"""
inputs = {}
for feat in feature_cols:
inputs[feat.name] = tf.placeholder(
shape=[None], dtype=feat.dtype, name=feat.name)
return tf.estimator.export.ServingInputReceiver(inputs, inputs)
def parse_csv(rows_string_tensor):
"""
Takes the string input tensor and returns a dict of rank-2 tensors.
Arguments:
rows_string_tensor : tf.Tensor object, Tensor of the prediction datapoint
Returns:
features : tensor objects of features for inference
"""
columns = tf.decode_csv(
rows_string_tensor, record_defaults=csv_defaults)
features = dict(zip(csv_cols, columns))
for key, _ in six.iteritems(features):
features[key] = tf.expand_dims(features[key], -1)
return features
def csv_serving_input_fn():
"""
Build the serving inputs.
Returns: Serving input function for CSV data
"""
csv_row = tf.placeholder(
shape=[None],
dtype=tf.string)
features = parse_csv(rows_string_tensor=csv_row)
return tf.estimator.export.ServingInputReceiver(
features,
{'csv_row': csv_row})
serving_functions = {
'JSON': json_serving_input_fn,
'CSV': csv_serving_input_fn
}
config_obj = Config(
model_dir=hparams.job_dir,
tf_random_seed=hparams.seed,
save_summary_steps=hparams.save_summary_steps,
session_config=None,
save_checkpoints_secs=hparams.save_checkpoints_secs,
save_checkpoints_steps=hparams.save_checkpoints_steps,
keep_checkpoint_max=hparams.keep_checkpoint_max,
keep_checkpoint_every_n_hours=hparams.keep_checkpoint_every_n_hours,
log_step_count_steps=hparams.log_step_count_steps,
train_distribute=hparams.distribute_strategy)
config_obj.set_config()
config = config_obj.get_config()
opt = Optimizer()
def linear_optimizer():
return opt.set_opt_wrap(
hparams.lin_opt,
hparams.learning_rate,
hparams.lr_rate_decay)
def deep_optimizer():
return opt.set_opt_wrap(
hparams.deep_opt,
hparams.learning_rate,
hparams.lr_rate_decay)
def poly_optimizer():
return opt.set_opt_wrap(
hparams.poly_opt,
hparams.learning_rate,
hparams.lr_rate_decay)
deep_cols = create_deep_cols(feature_cols, hparams.name)
hidden_units = [hparams.hidden_units]
feature_names = list(csv_cols)
feature_names.remove(hparams.target_var)
if hparams.name not in ['polynomialclassifier', 'polynomialregressor']:
model = CannedModel(
model_name=hparams.name,
feature_columns=feature_cols,
deep_columns=deep_cols,
hidden_units=hidden_units,
n_classes=hparams.n_classes,
linear_optimizer=linear_optimizer,
dnn_optimizer=deep_optimizer,
activation_fn=hparams.activation_fn,
dropout=hparams.dropout,
batch_norm=hparams.batch_norm,
config=config)
else:
model = CustomModel(
model_name=hparams.name,
batch_size=hparams.batch_size,
optimizer=poly_optimizer,
model_dir=hparams.job_dir,
config=config,
feature_names=feature_names,
learning_rate=hparams.learning_rate)
def mean_acc_metric(labels, predictions):
"""
Defining mean per class accuracy metric
Arguments:
labels : labels of the data
predictions : prediction of the model
Returns: function defining mean per class accuracy metric
"""
return mean_acc(labels, predictions, hparams.n_classes)
estimator = model.build_model()
if data.task_type == 'classification' and hparams.n_classes == 2:
estimator = tf.contrib.estimator.add_metrics(estimator, my_auc)
elif hparams.n_classes > 2:
estimator = tf.contrib.estimator.add_metrics(
estimator, mean_acc_metric)
else:
estimator = tf.contrib.estimator.add_metrics(estimator, rmse)
estimator = tf.contrib.estimator.add_metrics(estimator, mar)
if hparams.early_stopping:
old_loss = np.inf
for _ in range(hparams.eval_times):
estimator.train(input_fn=train_input,
steps=hparams.train_steps // hparams.eval_times)
output = estimator.evaluate(
input_fn=eval_input, steps=hparams.eval_steps)
loss = output['loss']
if loss >= old_loss:
tf.logging.info(
'EARLY STOPPING....... LOSS SATURATED AT : %s', loss)
break
else:
old_loss = loss
else:
train_spec = tf.estimator.TrainSpec(
train_input,
hparams.train_steps)
eval_spec = tf.estimator.EvalSpec(
eval_input,
hparams.eval_steps,
throttle_secs=hparams.eval_freq)
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)
if config_obj.get_is_chief():
estimator.export_savedmodel(
hparams.export_dir,
serving_functions[hparams.export_format],
assets_extra={
'lime_explainer': '/tmp/lime_explainer'},
strip_default_attrs=False)