in community-content/tf_agents_bandits_movie_recommendation_with_kfp_and_vertex_sdk/step_by_step_sdk_tf_agents_bandits_movie_recommendation/src/training/task.py [0:0]
def execute_task(args: argparse.Namespace,
best_hyperparameters_blob: Union[storage.Blob, None],
hypertune_client: Union[hypertune.HyperTune, None]) -> None:
"""Executes training, or hyperparameter tuning, for the policy.
Parses parameters and hyperparameters from the command line, reads best
hyperparameters if applicable, constructs the logical modules for RL, and
executes training or hyperparameter tuning. Tracks the training process
and resources using TensorBoard Profiler if applicable.
Args:
args: An argpase.Namespace object of (hyper)parameter values.
best_hyperparameters_blob: An object containing best hyperparameters in
Google Cloud Storage.
hypertune_client: Client for submitting hyperparameter tuning metrics.
"""
# [Do Not Change] Set the root directory for training artifacts.
root_dir = os.environ[
"AIP_MODEL_DIR"] if not args.run_hyperparameter_tuning else ""
# Use best hyperparameters learned from a previous hyperparameter tuning job.
logging.info(args.train_with_best_hyperparameters)
if args.train_with_best_hyperparameters:
best_hyperparameters = json.loads(
best_hyperparameters_blob.download_as_string())
if "BATCH_SIZE" in best_hyperparameters:
args.batch_size = best_hyperparameters["BATCH_SIZE"]
if "TRAINING_LOOPS" in best_hyperparameters:
args.training_loops = best_hyperparameters["TRAINING_LOOPS"]
if "STEPS_PER_LOOP" in best_hyperparameters:
args.step_per_loop = best_hyperparameters["STEPS_PER_LOOP"]
# Define RL environment.
env = movielens_py_environment.MovieLensPyEnvironment(
args.data_path, args.rank_k, args.batch_size,
num_movies=args.num_actions, csv_delimiter="\t")
environment = tf_py_environment.TFPyEnvironment(env)
# Define RL agent/algorithm.
agent = lin_ucb_agent.LinearUCBAgent(
time_step_spec=environment.time_step_spec(),
action_spec=environment.action_spec(),
tikhonov_weight=args.tikhonov_weight,
alpha=args.agent_alpha,
dtype=tf.float32,
accepts_per_arm_features=PER_ARM)
logging.info("TimeStep Spec (for each batch):\n%s\n", agent.time_step_spec)
logging.info("Action Spec (for each batch):\n%s\n", agent.action_spec)
logging.info("Reward Spec (for each batch):\n%s\n", environment.reward_spec())
# Define RL metric.
optimal_reward_fn = functools.partial(
environment_utilities.compute_optimal_reward_with_movielens_environment,
environment=environment)
regret_metric = tf_bandit_metrics.RegretMetric(optimal_reward_fn)
metrics = [regret_metric]
# Perform on-policy training with the simulation MovieLens environment.
if args.profiler_dir is not None:
tf.profiler.experimental.start(args.profiler_dir)
metric_results = policy_util.train(
agent=agent,
environment=environment,
training_loops=args.training_loops,
steps_per_loop=args.steps_per_loop,
additional_metrics=metrics,
run_hyperparameter_tuning=args.run_hyperparameter_tuning,
root_dir=root_dir if not args.run_hyperparameter_tuning else None,
artifacts_dir=args.artifacts_dir
if not args.run_hyperparameter_tuning else None)
if args.profiler_dir is not None:
tf.profiler.experimental.stop()
# Report training metrics to Vertex AI for hyperparameter tuning
if args.run_hyperparameter_tuning:
hypertune_client.report_hyperparameter_tuning_metric(
hyperparameter_metric_tag="final_average_return",
metric_value=metric_results["AverageReturnMetric"][-1])