in tensor2tensor/rl/ppo_learner.py [0:0]
def _define_collect(batch_env, ppo_hparams, scope, frame_stack_size, eval_phase,
sampling_temp, force_beginning_resets,
distributional_size=1):
"""Collect trajectories.
Args:
batch_env: Batch environment.
ppo_hparams: PPO hparams, defined in tensor2tensor.models.research.rl.
scope: var scope.
frame_stack_size: Number of last observations to feed into the policy.
eval_phase: TODO(koz4k): Write docstring.
sampling_temp: Sampling temperature for the policy.
force_beginning_resets: Whether to reset at the beginning of each episode.
distributional_size: optional, number of buckets in distributional RL.
Returns:
Returns memory (observations, rewards, dones, actions,
pdfs, values_functions)
containing a rollout of environment from nested wrapped structure.
"""
epoch_length = ppo_hparams.epoch_length
to_initialize = []
with tf.variable_scope(scope, reuse=tf.AUTO_REUSE):
num_agents = batch_env.batch_size
to_initialize.append(batch_env)
wrappers = [(StackWrapper, {
"history": frame_stack_size
}), (_MemoryWrapper, {})]
rollout_metadata = None
speculum = None
for w in wrappers:
tf.logging.info("Applying wrapper %s(%s) to env %s." % (str(
w[0]), str(w[1]), str(batch_env)))
batch_env = w[0](batch_env, **w[1])
to_initialize.append(batch_env)
rollout_metadata = _rollout_metadata(batch_env, distributional_size)
speculum = batch_env.speculum
def initialization_lambda(sess):
for batch_env in to_initialize:
batch_env.initialize(sess)
memory = [
tf.get_variable( # pylint: disable=g-complex-comprehension
"collect_memory_%d_%s" % (epoch_length, name),
shape=[epoch_length] + shape,
dtype=dtype,
initializer=tf.zeros_initializer(),
trainable=False) for (shape, dtype, name) in rollout_metadata
]
cumulative_rewards = tf.get_variable(
"cumulative_rewards", len(batch_env), trainable=False)
eval_phase_t = tf.convert_to_tensor(eval_phase)
should_reset_var = tf.Variable(True, trainable=False)
zeros_tensor = tf.zeros(len(batch_env))
force_beginning_resets = tf.convert_to_tensor(force_beginning_resets)
def reset_ops_group():
return tf.group(
batch_env.reset(tf.range(len(batch_env))),
tf.assign(cumulative_rewards, zeros_tensor))
reset_op = tf.cond(
tf.logical_or(should_reset_var.read_value(), force_beginning_resets),
reset_ops_group, tf.no_op)
with tf.control_dependencies([reset_op]):
reset_once_op = tf.assign(should_reset_var, False)
with tf.control_dependencies([reset_once_op]):
def step(index, scores_sum, scores_num):
"""Single step."""
index %= epoch_length # Only needed in eval runs.
# Note - the only way to ensure making a copy of tensor is to run simple
# operation. We are waiting for tf.copy:
# https://github.com/tensorflow/tensorflow/issues/11186
obs_copy = batch_env.observ + 0
value_fun_shape = (num_agents,)
if distributional_size > 1:
value_fun_shape = (num_agents, distributional_size)
def env_step(arg1, arg2, arg3): # pylint: disable=unused-argument
"""Step of the environment."""
(logits, value_function) = get_policy(
obs_copy, ppo_hparams, batch_env.action_space, distributional_size
)
action = common_layers.sample_with_temperature(logits, sampling_temp)
action = tf.cast(action, tf.int32)
action = tf.reshape(action, shape=(num_agents,))
reward, done = batch_env.simulate(action)
pdf = tfp.distributions.Categorical(logits=logits).prob(action)
pdf = tf.reshape(pdf, shape=(num_agents,))
value_function = tf.reshape(value_function, shape=value_fun_shape)
done = tf.reshape(done, shape=(num_agents,))
with tf.control_dependencies([reward, done]):
return tf.identity(pdf), tf.identity(value_function), \
tf.identity(done)
# TODO(piotrmilos): while_body is executed at most once,
# thus should be replaced with tf.cond
pdf, value_function, top_level_done = tf.while_loop(
lambda _1, _2, _3: tf.equal(speculum.size(), 0),
env_step,
[
tf.constant(0.0, shape=(num_agents,)),
tf.constant(0.0, shape=value_fun_shape),
tf.constant(False, shape=(num_agents,))
],
parallel_iterations=1,
back_prop=False,
)
with tf.control_dependencies([pdf, value_function]):
obs, reward, done, action = speculum.dequeue()
to_save = [obs, reward, done, action, pdf, value_function]
save_ops = [
tf.scatter_update(memory_slot, index, value)
for memory_slot, value in zip(memory, to_save)
]
cumulate_rewards_op = cumulative_rewards.assign_add(reward)
agent_indices_to_reset = tf.where(top_level_done)[:, 0]
with tf.control_dependencies([cumulate_rewards_op]):
# TODO(piotrmilos): possibly we need cumulative_rewards.read_value()
scores_sum_delta = tf.reduce_sum(
tf.gather(cumulative_rewards.read_value(), agent_indices_to_reset))
scores_num_delta = tf.count_nonzero(done, dtype=tf.int32)
with tf.control_dependencies(save_ops +
[scores_sum_delta, scores_num_delta]):
reset_env_op = batch_env.reset(agent_indices_to_reset)
reset_cumulative_rewards_op = tf.scatter_update(
cumulative_rewards, agent_indices_to_reset,
tf.gather(zeros_tensor, agent_indices_to_reset))
with tf.control_dependencies([reset_env_op, reset_cumulative_rewards_op]):
return [
index + 1, scores_sum + scores_sum_delta,
scores_num + scores_num_delta
]
def stop_condition(i, _, resets):
return tf.cond(eval_phase_t, lambda: resets < num_agents,
lambda: i < epoch_length)
init = [tf.constant(0), tf.constant(0.0), tf.constant(0)]
index, scores_sum, scores_num = tf.while_loop(
stop_condition, step, init, parallel_iterations=1, back_prop=False)
# We handle force_beginning_resets differently. We assume that all envs are
# reseted at the end of episod (though it happens at the beginning of the
# next one
scores_num = tf.cond(force_beginning_resets,
lambda: scores_num + len(batch_env), lambda: scores_num)
with tf.control_dependencies([scores_sum]):
scores_sum = tf.cond(
force_beginning_resets,
lambda: scores_sum + tf.reduce_sum(cumulative_rewards.read_value()),
lambda: scores_sum)
mean_score = tf.cond(
tf.greater(scores_num, 0),
lambda: scores_sum / tf.cast(scores_num, tf.float32), lambda: 0.)
printing = tf.Print(0, [mean_score, scores_sum, scores_num], "mean_score: ")
with tf.control_dependencies([index, printing]):
memory = [mem.read_value() for mem in memory]
# When generating real data together with PPO training we must use single
# agent. For PPO to work we reshape the history, as if it was generated
# by real_ppo_effective_num_agents.
if ppo_hparams.effective_num_agents is not None and not eval_phase:
new_memory = []
effective_num_agents = ppo_hparams.effective_num_agents
assert epoch_length % ppo_hparams.effective_num_agents == 0, (
"The rollout of ppo_hparams.epoch_length will be distributed amongst"
"effective_num_agents of agents")
new_epoch_length = int(epoch_length / effective_num_agents)
for mem, info in zip(memory, rollout_metadata):
shape, _, name = info
new_shape = [effective_num_agents, new_epoch_length] + shape[1:]
perm = list(range(len(shape) + 1))
perm[0] = 1
perm[1] = 0
mem = tf.transpose(mem, perm=perm)
mem = tf.reshape(mem, shape=new_shape)
mem = tf.transpose(
mem,
perm=perm,
name="collect_memory_%d_%s" % (new_epoch_length, name))
new_memory.append(mem)
memory = new_memory
with tf.variable_scope(scope, reuse=tf.AUTO_REUSE):
mean_score_summary = tf.cond(
tf.greater(scores_num, 0),
lambda: tf.summary.scalar("mean_score_this_iter", mean_score), str)
summaries = tf.summary.merge([
mean_score_summary,
tf.summary.scalar("episodes_finished_this_iter", scores_num)
])
return memory, summaries, initialization_lambda