in adanet/core/estimator_distributed_test_runner.py [0:0]
def train_and_evaluate_estimator():
"""Runs Estimator distributed training."""
# The tf.estimator.RunConfig automatically parses the TF_CONFIG environment
# variables during construction.
# For more information on how tf.estimator.RunConfig uses TF_CONFIG, see
# https://www.tensorflow.org/api_docs/python/tf/estimator/RunConfig.
config = tf.estimator.RunConfig(
tf_random_seed=42,
save_checkpoints_steps=10,
save_checkpoints_secs=None,
# Keep all checkpoints to avoid checkpoint GC causing failures during
# evaluation.
# TODO: Prevent checkpoints that are currently being
# evaluated by another process from being garbage collected.
keep_checkpoint_max=None,
model_dir=FLAGS.model_dir,
session_config=tf_compat.v1.ConfigProto(
log_device_placement=False,
# Ignore other workers; only talk to parameter servers.
# Otherwise, when a chief/worker terminates, the others will hang.
device_filters=["/job:ps"]))
def input_fn():
input_features = {"x": tf.constant(features, name="x")}
input_labels = tf.constant(labels, name="y")
return tf.data.Dataset.from_tensors((input_features, input_labels)).repeat()
kwargs = {
"max_iteration_steps": 100,
"force_grow": True,
"delay_secs_per_worker": .2,
"max_worker_delay_secs": 1,
"worker_wait_secs": 1,
# Set low timeout to reduce wait time for failures.
"worker_wait_timeout_secs": 180,
"evaluator": Evaluator(input_fn, steps=10),
"config": config
}
head = head_lib._regression_head( # pylint: disable=protected-access
loss_reduction=tf.losses.Reduction.SUM_OVER_BATCH_SIZE)
features = [[1., 0.], [0., 0], [0., 1.], [1., 1.]]
labels = [[1.], [0.], [1.], [0.]]
estimator_type = FLAGS.estimator_type
if FLAGS.placement_strategy == "round_robin":
kwargs["experimental_placement_strategy"] = RoundRobinStrategy()
if estimator_type == "autoensemble":
feature_columns = [tf.feature_column.numeric_column("x", shape=[2])]
# pylint: disable=g-long-lambda
# TODO: Switch optimizers to tf.keras.optimizers.Adam once the
# distribution bug is fixed.
candidate_pool = {
"linear":
tf.estimator.LinearEstimator(
head=head,
feature_columns=feature_columns,
optimizer=lambda: tf_compat.v1.train.AdamOptimizer(
learning_rate=.001)),
"dnn":
tf.estimator.DNNEstimator(
head=head,
feature_columns=feature_columns,
optimizer=lambda: tf_compat.v1.train.AdamOptimizer(
learning_rate=.001),
hidden_units=[3]),
"dnn2":
tf.estimator.DNNEstimator(
head=head,
feature_columns=feature_columns,
optimizer=lambda: tf_compat.v1.train.AdamOptimizer(
learning_rate=.001),
hidden_units=[10, 10]),
}
# pylint: enable=g-long-lambda
estimator = AutoEnsembleEstimator(
head=head, candidate_pool=candidate_pool, **kwargs)
elif estimator_type == "estimator":
subnetwork_generator = SimpleGenerator([
_DNNBuilder("dnn1", config, layer_size=3),
_DNNBuilder("dnn2", config, layer_size=4),
_DNNBuilder("dnn3", config, layer_size=5),
])
estimator = Estimator(
head=head, subnetwork_generator=subnetwork_generator, **kwargs)
elif FLAGS.estimator_type == "autoensemble_trees_multiclass":
if not bt_losses:
logging.warning(
"Skipped autoensemble_trees_multiclass test since contrib is missing."
)
return
n_classes = 3
head = head_lib._multi_class_head_with_softmax_cross_entropy_loss( # pylint: disable=protected-access
n_classes=n_classes,
loss_reduction=tf.losses.Reduction.SUM_OVER_BATCH_SIZE)
def tree_loss_fn(labels, logits):
result = bt_losses.per_example_maxent_loss(
labels=labels, logits=logits, num_classes=n_classes, weights=None)
return result[0]
tree_head = head_lib._multi_class_head_with_softmax_cross_entropy_loss( # pylint: disable=protected-access
loss_fn=tree_loss_fn,
n_classes=n_classes,
loss_reduction=tf.losses.Reduction.SUM_OVER_BATCH_SIZE)
labels = [[1], [0], [1], [2]]
feature_columns = [tf.feature_column.numeric_column("x", shape=[2])]
# TODO: Switch optimizers to tf.keras.optimizers.Adam once the
# distribution bug is fixed.
candidate_pool = lambda config: { # pylint: disable=g-long-lambda
"linear":
tf.estimator.LinearEstimator(
head=head,
feature_columns=feature_columns,
optimizer=tf_compat.v1.train.AdamOptimizer(
learning_rate=.001),
config=config),
"gbdt":
tf.estimator.BoostedTreesEstimator(
head=tree_head,
feature_columns=feature_columns,
n_trees=10,
n_batches_per_layer=1,
center_bias=False,
config=config),
}
estimator = AutoEnsembleEstimator(
head=head, candidate_pool=candidate_pool, **kwargs)
elif estimator_type == "estimator_with_experimental_multiworker_strategy":
def _model_fn(features, labels, mode):
"""Test model_fn."""
layer = tf.keras.layers.Dense(1)
logits = layer(features["x"])
if mode == tf.estimator.ModeKeys.PREDICT:
predictions = {"logits": logits}
return tf.estimator.EstimatorSpec(mode, predictions=predictions)
loss = tf.losses.mean_squared_error(
labels=labels,
predictions=logits,
reduction=tf.losses.Reduction.SUM_OVER_BATCH_SIZE)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.estimator.EstimatorSpec(mode, loss=loss)
if mode == tf.estimator.ModeKeys.TRAIN:
optimizer = tf.train.GradientDescentOptimizer(0.2)
train_op = optimizer.minimize(
loss, global_step=tf.train.get_global_step())
return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
if json.loads(os.environ["TF_CONFIG"])["task"]["type"] == "evaluator":
# The evaluator job would crash if MultiWorkerMirroredStrategy is called.
distribution = None
else:
distribution = tf.distribute.experimental.MultiWorkerMirroredStrategy()
multiworker_config = tf.estimator.RunConfig(
tf_random_seed=42,
model_dir=FLAGS.model_dir,
train_distribute=distribution,
session_config=tf_compat.v1.ConfigProto(log_device_placement=False))
# TODO: Replace with adanet.Estimator. Currently this just verifies
# that the distributed testing framework supports distribute strategies.
estimator = tf.estimator.Estimator(
model_fn=_model_fn, config=multiworker_config)
train_hooks = [
tf.estimator.ProfilerHook(save_steps=50, output_dir=FLAGS.model_dir)
]
# Train for three iterations.
train_spec = tf.estimator.TrainSpec(
input_fn=input_fn, max_steps=300, hooks=train_hooks)
eval_spec = tf.estimator.EvalSpec(
input_fn=input_fn, steps=1, start_delay_secs=.5, throttle_secs=.05)
# Calling train_and_evaluate is the official way to perform distributed
# training with an Estimator. Calling Estimator#train directly results
# in an error when the TF_CONFIG is setup for a cluster.
tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)