in Experiments/PolicyManagers.py [0:0]
def get_trajectory_segment(self, i):
if self.args.data=='Continuous' or self.args.data=='ContinuousDir' or self.args.data=='ContinuousNonZero' or self.args.data=='ContinuousDirNZ' or self.args.data=='GoalDirected' or self.args.data=='Separable':
# Sample trajectory segment from dataset.
sample_traj, sample_action_seq = self.dataset[i]
# Subsample trajectory segment.
start_timepoint = np.random.randint(0,self.args.traj_length-self.traj_length)
end_timepoint = start_timepoint + self.traj_length
# The trajectory is going to be one step longer than the action sequence, because action sequences are constructed from state differences. Instead, truncate trajectory to length of action sequence.
sample_traj = sample_traj[start_timepoint:end_timepoint]
sample_action_seq = sample_action_seq[start_timepoint:end_timepoint-1]
self.current_traj_len = self.traj_length
# Now manage concatenated trajectory differently - {{s0,_},{s1,a0},{s2,a1},...,{sn,an-1}}.
concatenated_traj = self.concat_state_action(sample_traj, sample_action_seq)
return concatenated_traj, sample_action_seq, sample_traj
elif self.args.data=='MIME' or self.args.data=='Roboturk' or self.args.data=='OrigRoboturk' or self.args.data=='FullRoboturk' or self.args.data=='Mocap':
data_element = self.dataset[i]
# If Invalid.
if not(data_element['is_valid']):
return None, None, None
# if self.args.data=='MIME':
# # Sample a trajectory length that's valid.
# trajectory = np.concatenate([data_element['la_trajectory'],data_element['ra_trajectory'],data_element['left_gripper'].reshape((-1,1)),data_element['right_gripper'].reshape((-1,1))],axis=-1)
# elif self.args.data=='Roboturk':
# trajectory = data_element['demo']
if self.args.gripper:
trajectory = data_element['demo']
else:
trajectory = data_element['demo'][:,:-1]
# If allowing variable skill length, set length for this sample.
if self.args.var_skill_length:
# Choose length of 12-16 with certain probabilities.
self.current_traj_len = np.random.choice([12,13,14,15,16],p=[0.1,0.2,0.4,0.2,0.1])
else:
self.current_traj_len = self.traj_length
# Sample random start point.
if trajectory.shape[0]>self.current_traj_len:
bias_length = int(self.args.pretrain_bias_sampling*trajectory.shape[0])
# Probability with which to sample biased segment:
sample_biased_segment = np.random.binomial(1,p=self.args.pretrain_bias_sampling_prob)
# If we want to bias sampling of trajectory segments towards the middle of the trajectory, to increase proportion of trajectory segments
# that are performing motions apart from reaching and returning.
# Sample a biased segment if trajectory length is sufficient, and based on probability of sampling.
if ((trajectory.shape[0]-2*bias_length)>self.current_traj_len) and sample_biased_segment:
start_timepoint = np.random.randint(bias_length, trajectory.shape[0] - self.current_traj_len - bias_length)
else:
start_timepoint = np.random.randint(0,trajectory.shape[0]-self.current_traj_len)
end_timepoint = start_timepoint + self.current_traj_len
# Get trajectory segment and actions.
trajectory = trajectory[start_timepoint:end_timepoint]
# If normalization is set to some value.
if self.args.normalization=='meanvar' or self.args.normalization=='minmax':
trajectory = (trajectory-self.norm_sub_value)/self.norm_denom_value
# CONDITIONAL INFORMATION for the encoder...
if self.args.data=='MIME' or self.args.data=='Mocap':
pass
elif self.args.data=='Roboturk' or self.args.data=='OrigRoboturk' or self.args.data=='FullRoboturk':
# robot_states = data_element['robot-state'][start_timepoint:end_timepoint]
# object_states = data_element['object-state'][start_timepoint:end_timepoint]
pass
# self.conditional_information = np.zeros((len(trajectory),self.conditional_info_size))
# self.conditional_information[:,:self.cond_robot_state_size] = robot_states
# self.conditional_information[:,self.cond_robot_state_size:object_states.shape[-1]] = object_states
# conditional_info = np.concatenate([robot_states,object_states],axis=1)
else:
return None, None, None
action_sequence = np.diff(trajectory,axis=0)
# Concatenate
concatenated_traj = self.concat_state_action(trajectory, action_sequence)
# NOW SCALE THIS ACTION SEQUENCE BY SOME FACTOR:
scaled_action_sequence = self.args.action_scale_factor*action_sequence
return concatenated_traj, scaled_action_sequence, trajectory