def execute_task()

in community-content/tf_agents_bandits_movie_recommendation_with_kfp_and_vertex_sdk/step_by_step_sdk_tf_agents_bandits_movie_recommendation/src/training/task.py [0:0]


def execute_task(args: argparse.Namespace,
                 best_hyperparameters_blob: Union[storage.Blob, None],
                 hypertune_client: Union[hypertune.HyperTune, None]) -> None:
  """Executes training, or hyperparameter tuning, for the policy.

  Parses parameters and hyperparameters from the command line, reads best
  hyperparameters if applicable, constructs the logical modules for RL, and
  executes training or hyperparameter tuning. Tracks the training process
  and resources using TensorBoard Profiler if applicable.

  Args:
    args: An argpase.Namespace object of (hyper)parameter values.
    best_hyperparameters_blob: An object containing best hyperparameters in
      Google Cloud Storage.
    hypertune_client: Client for submitting hyperparameter tuning metrics.
  """
  # [Do Not Change] Set the root directory for training artifacts.
  root_dir = os.environ[
      "AIP_MODEL_DIR"] if not args.run_hyperparameter_tuning else ""

  # Use best hyperparameters learned from a previous hyperparameter tuning job.
  logging.info(args.train_with_best_hyperparameters)
  if args.train_with_best_hyperparameters:
    best_hyperparameters = json.loads(
        best_hyperparameters_blob.download_as_string())
    if "BATCH_SIZE" in best_hyperparameters:
      args.batch_size = best_hyperparameters["BATCH_SIZE"]
    if "TRAINING_LOOPS" in best_hyperparameters:
      args.training_loops = best_hyperparameters["TRAINING_LOOPS"]
    if "STEPS_PER_LOOP" in best_hyperparameters:
      args.step_per_loop = best_hyperparameters["STEPS_PER_LOOP"]

  # Define RL environment.
  env = movielens_py_environment.MovieLensPyEnvironment(
      args.data_path, args.rank_k, args.batch_size,
      num_movies=args.num_actions, csv_delimiter="\t")
  environment = tf_py_environment.TFPyEnvironment(env)

  # Define RL agent/algorithm.
  agent = lin_ucb_agent.LinearUCBAgent(
      time_step_spec=environment.time_step_spec(),
      action_spec=environment.action_spec(),
      tikhonov_weight=args.tikhonov_weight,
      alpha=args.agent_alpha,
      dtype=tf.float32,
      accepts_per_arm_features=PER_ARM)
  logging.info("TimeStep Spec (for each batch):\n%s\n", agent.time_step_spec)
  logging.info("Action Spec (for each batch):\n%s\n", agent.action_spec)
  logging.info("Reward Spec (for each batch):\n%s\n", environment.reward_spec())

  # Define RL metric.
  optimal_reward_fn = functools.partial(
      environment_utilities.compute_optimal_reward_with_movielens_environment,
      environment=environment)
  regret_metric = tf_bandit_metrics.RegretMetric(optimal_reward_fn)
  metrics = [regret_metric]

  # Perform on-policy training with the simulation MovieLens environment.
  if args.profiler_dir is not None:
    tf.profiler.experimental.start(args.profiler_dir)
  metric_results = policy_util.train(
      agent=agent,
      environment=environment,
      training_loops=args.training_loops,
      steps_per_loop=args.steps_per_loop,
      additional_metrics=metrics,
      run_hyperparameter_tuning=args.run_hyperparameter_tuning,
      root_dir=root_dir if not args.run_hyperparameter_tuning else None,
      artifacts_dir=args.artifacts_dir
      if not args.run_hyperparameter_tuning else None)
  if args.profiler_dir is not None:
    tf.profiler.experimental.stop()

  # Report training metrics to Vertex AI for hyperparameter tuning
  if args.run_hyperparameter_tuning:
    hypertune_client.report_hyperparameter_tuning_metric(
        hyperparameter_metric_tag="final_average_return",
        metric_value=metric_results["AverageReturnMetric"][-1])