in example_zoo/tensorflow/models/ncf_main/official/recommendation/ncf_main.py [0:0]
def run_ncf(_):
"""Run NCF training and eval loop."""
if FLAGS.download_if_missing and not FLAGS.use_synthetic_data:
movielens.download(FLAGS.dataset, FLAGS.data_dir)
if FLAGS.seed is not None:
np.random.seed(FLAGS.seed)
params = parse_flags(FLAGS)
total_training_cycle = FLAGS.train_epochs // FLAGS.epochs_between_evals
if FLAGS.use_synthetic_data:
producer = data_pipeline.DummyConstructor()
num_users, num_items = data_preprocessing.DATASET_TO_NUM_USERS_AND_ITEMS[
FLAGS.dataset]
num_train_steps = rconst.SYNTHETIC_BATCHES_PER_EPOCH
num_eval_steps = rconst.SYNTHETIC_BATCHES_PER_EPOCH
else:
num_users, num_items, producer = data_preprocessing.instantiate_pipeline(
dataset=FLAGS.dataset, data_dir=FLAGS.data_dir, params=params,
constructor_type=FLAGS.constructor_type,
deterministic=FLAGS.seed is not None)
num_train_steps = (producer.train_batches_per_epoch //
params["batches_per_step"])
num_eval_steps = (producer.eval_batches_per_epoch //
params["batches_per_step"])
assert not producer.train_batches_per_epoch % params["batches_per_step"]
assert not producer.eval_batches_per_epoch % params["batches_per_step"]
producer.start()
params["num_users"], params["num_items"] = num_users, num_items
model_helpers.apply_clean(flags.FLAGS)
estimator = construct_estimator(model_dir=FLAGS.model_dir, params=params)
benchmark_logger, train_hooks = log_and_get_hooks(params["eval_batch_size"])
target_reached = False
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_LOOP)
for cycle_index in range(total_training_cycle):
assert FLAGS.epochs_between_evals == 1 or not mlperf_helper.LOGGER.enabled
tf.logging.info("Starting a training cycle: {}/{}".format(
cycle_index + 1, total_training_cycle))
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.TRAIN_EPOCH,
value=cycle_index)
train_input_fn = producer.make_input_fn(is_training=True)
estimator.train(input_fn=train_input_fn, hooks=train_hooks,
steps=num_train_steps)
tf.logging.info("Beginning evaluation.")
eval_input_fn = producer.make_input_fn(is_training=False)
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_START,
value=cycle_index)
eval_results = estimator.evaluate(eval_input_fn, steps=num_eval_steps)
tf.logging.info("Evaluation complete.")
hr = float(eval_results[rconst.HR_KEY])
ndcg = float(eval_results[rconst.NDCG_KEY])
loss = float(eval_results["loss"])
mlperf_helper.ncf_print(
key=mlperf_helper.TAGS.EVAL_TARGET,
value={"epoch": cycle_index, "value": FLAGS.hr_threshold})
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_ACCURACY,
value={"epoch": cycle_index, "value": hr})
mlperf_helper.ncf_print(
key=mlperf_helper.TAGS.EVAL_HP_NUM_NEG,
value={"epoch": cycle_index, "value": rconst.NUM_EVAL_NEGATIVES})
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.EVAL_STOP, value=cycle_index)
# Benchmark the evaluation results
benchmark_logger.log_evaluation_result(eval_results)
# Log the HR and NDCG results.
tf.logging.info(
"Iteration {}: HR = {:.4f}, NDCG = {:.4f}, Loss = {:.4f}".format(
cycle_index + 1, hr, ndcg, loss))
# If some evaluation threshold is met
if model_helpers.past_stop_threshold(FLAGS.hr_threshold, hr):
target_reached = True
break
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.RUN_STOP,
value={"success": target_reached})
producer.stop_loop()
producer.join()
# Clear the session explicitly to avoid session delete error
tf.keras.backend.clear_session()
mlperf_helper.ncf_print(key=mlperf_helper.TAGS.RUN_FINAL)