def _define_collect()

in tensor2tensor/rl/ppo_learner.py [0:0]


def _define_collect(batch_env, ppo_hparams, scope, frame_stack_size, eval_phase,
                    sampling_temp, force_beginning_resets,
                    distributional_size=1):
  """Collect trajectories.

  Args:
    batch_env: Batch environment.
    ppo_hparams: PPO hparams, defined in tensor2tensor.models.research.rl.
    scope: var scope.
    frame_stack_size: Number of last observations to feed into the policy.
    eval_phase: TODO(koz4k): Write docstring.
    sampling_temp: Sampling temperature for the policy.
    force_beginning_resets: Whether to reset at the beginning of each episode.
    distributional_size: optional, number of buckets in distributional RL.

  Returns:
    Returns memory (observations, rewards, dones, actions,
    pdfs, values_functions)
    containing a rollout of environment from nested wrapped structure.
  """
  epoch_length = ppo_hparams.epoch_length

  to_initialize = []
  with tf.variable_scope(scope, reuse=tf.AUTO_REUSE):
    num_agents = batch_env.batch_size

    to_initialize.append(batch_env)
    wrappers = [(StackWrapper, {
        "history": frame_stack_size
    }), (_MemoryWrapper, {})]
    rollout_metadata = None
    speculum = None
    for w in wrappers:
      tf.logging.info("Applying wrapper %s(%s) to env %s." % (str(
          w[0]), str(w[1]), str(batch_env)))
      batch_env = w[0](batch_env, **w[1])
      to_initialize.append(batch_env)

    rollout_metadata = _rollout_metadata(batch_env, distributional_size)
    speculum = batch_env.speculum

    def initialization_lambda(sess):
      for batch_env in to_initialize:
        batch_env.initialize(sess)

    memory = [
        tf.get_variable(  # pylint: disable=g-complex-comprehension
            "collect_memory_%d_%s" % (epoch_length, name),
            shape=[epoch_length] + shape,
            dtype=dtype,
            initializer=tf.zeros_initializer(),
            trainable=False) for (shape, dtype, name) in rollout_metadata
    ]

    cumulative_rewards = tf.get_variable(
        "cumulative_rewards", len(batch_env), trainable=False)

    eval_phase_t = tf.convert_to_tensor(eval_phase)
    should_reset_var = tf.Variable(True, trainable=False)
    zeros_tensor = tf.zeros(len(batch_env))

  force_beginning_resets = tf.convert_to_tensor(force_beginning_resets)

  def reset_ops_group():
    return tf.group(
        batch_env.reset(tf.range(len(batch_env))),
        tf.assign(cumulative_rewards, zeros_tensor))

  reset_op = tf.cond(
      tf.logical_or(should_reset_var.read_value(), force_beginning_resets),
      reset_ops_group, tf.no_op)

  with tf.control_dependencies([reset_op]):
    reset_once_op = tf.assign(should_reset_var, False)

  with tf.control_dependencies([reset_once_op]):

    def step(index, scores_sum, scores_num):
      """Single step."""
      index %= epoch_length  # Only needed in eval runs.
      # Note - the only way to ensure making a copy of tensor is to run simple
      # operation. We are waiting for tf.copy:
      # https://github.com/tensorflow/tensorflow/issues/11186
      obs_copy = batch_env.observ + 0
      value_fun_shape = (num_agents,)
      if distributional_size > 1:
        value_fun_shape = (num_agents, distributional_size)

      def env_step(arg1, arg2, arg3):  # pylint: disable=unused-argument
        """Step of the environment."""

        (logits, value_function) = get_policy(
            obs_copy, ppo_hparams, batch_env.action_space, distributional_size
        )
        action = common_layers.sample_with_temperature(logits, sampling_temp)
        action = tf.cast(action, tf.int32)
        action = tf.reshape(action, shape=(num_agents,))

        reward, done = batch_env.simulate(action)

        pdf = tfp.distributions.Categorical(logits=logits).prob(action)
        pdf = tf.reshape(pdf, shape=(num_agents,))
        value_function = tf.reshape(value_function, shape=value_fun_shape)
        done = tf.reshape(done, shape=(num_agents,))

        with tf.control_dependencies([reward, done]):
          return tf.identity(pdf), tf.identity(value_function), \
                 tf.identity(done)

      # TODO(piotrmilos): while_body is executed at most once,
      # thus should be replaced with tf.cond
      pdf, value_function, top_level_done = tf.while_loop(
          lambda _1, _2, _3: tf.equal(speculum.size(), 0),
          env_step,
          [
              tf.constant(0.0, shape=(num_agents,)),
              tf.constant(0.0, shape=value_fun_shape),
              tf.constant(False, shape=(num_agents,))
          ],
          parallel_iterations=1,
          back_prop=False,
      )

      with tf.control_dependencies([pdf, value_function]):
        obs, reward, done, action = speculum.dequeue()
        to_save = [obs, reward, done, action, pdf, value_function]
        save_ops = [
            tf.scatter_update(memory_slot, index, value)
            for memory_slot, value in zip(memory, to_save)
        ]
        cumulate_rewards_op = cumulative_rewards.assign_add(reward)

        agent_indices_to_reset = tf.where(top_level_done)[:, 0]
      with tf.control_dependencies([cumulate_rewards_op]):
        # TODO(piotrmilos): possibly we need cumulative_rewards.read_value()
        scores_sum_delta = tf.reduce_sum(
            tf.gather(cumulative_rewards.read_value(), agent_indices_to_reset))
        scores_num_delta = tf.count_nonzero(done, dtype=tf.int32)
      with tf.control_dependencies(save_ops +
                                   [scores_sum_delta, scores_num_delta]):
        reset_env_op = batch_env.reset(agent_indices_to_reset)
        reset_cumulative_rewards_op = tf.scatter_update(
            cumulative_rewards, agent_indices_to_reset,
            tf.gather(zeros_tensor, agent_indices_to_reset))
      with tf.control_dependencies([reset_env_op, reset_cumulative_rewards_op]):
        return [
            index + 1, scores_sum + scores_sum_delta,
            scores_num + scores_num_delta
        ]

    def stop_condition(i, _, resets):
      return tf.cond(eval_phase_t, lambda: resets < num_agents,
                     lambda: i < epoch_length)

    init = [tf.constant(0), tf.constant(0.0), tf.constant(0)]
    index, scores_sum, scores_num = tf.while_loop(
        stop_condition, step, init, parallel_iterations=1, back_prop=False)

  # We handle force_beginning_resets differently. We assume that all envs are
  # reseted at the end of episod (though it happens at the beginning of the
  # next one
  scores_num = tf.cond(force_beginning_resets,
                       lambda: scores_num + len(batch_env), lambda: scores_num)

  with tf.control_dependencies([scores_sum]):
    scores_sum = tf.cond(
        force_beginning_resets,
        lambda: scores_sum + tf.reduce_sum(cumulative_rewards.read_value()),
        lambda: scores_sum)

  mean_score = tf.cond(
      tf.greater(scores_num, 0),
      lambda: scores_sum / tf.cast(scores_num, tf.float32), lambda: 0.)
  printing = tf.Print(0, [mean_score, scores_sum, scores_num], "mean_score: ")
  with tf.control_dependencies([index, printing]):
    memory = [mem.read_value() for mem in memory]
    # When generating real data together with PPO training we must use single
    # agent. For PPO to work we reshape the history, as if it was generated
    # by real_ppo_effective_num_agents.
    if ppo_hparams.effective_num_agents is not None and not eval_phase:
      new_memory = []
      effective_num_agents = ppo_hparams.effective_num_agents
      assert epoch_length % ppo_hparams.effective_num_agents == 0, (
          "The rollout of ppo_hparams.epoch_length will be distributed amongst"
          "effective_num_agents of agents")
      new_epoch_length = int(epoch_length / effective_num_agents)
      for mem, info in zip(memory, rollout_metadata):
        shape, _, name = info
        new_shape = [effective_num_agents, new_epoch_length] + shape[1:]
        perm = list(range(len(shape) + 1))
        perm[0] = 1
        perm[1] = 0
        mem = tf.transpose(mem, perm=perm)
        mem = tf.reshape(mem, shape=new_shape)
        mem = tf.transpose(
            mem,
            perm=perm,
            name="collect_memory_%d_%s" % (new_epoch_length, name))
        new_memory.append(mem)
      memory = new_memory

    with tf.variable_scope(scope, reuse=tf.AUTO_REUSE):
      mean_score_summary = tf.cond(
          tf.greater(scores_num, 0),
          lambda: tf.summary.scalar("mean_score_this_iter", mean_score), str)
      summaries = tf.summary.merge([
          mean_score_summary,
          tf.summary.scalar("episodes_finished_this_iter", scores_num)
      ])
      return memory, summaries, initialization_lambda