in MTRF/r3l/r3l/r3l_envs/inhand_env/base.py [0:0]
def get_obs_dict(self) -> Dict[str, np.ndarray]:
obs_dict = super().get_obs_dict()
# Object data
object_qpos, object_qvel = obs_dict["object_qpos"], obs_dict["object_qvel"]
object_xyz, object_quat = object_qpos[:3], object_qpos[3:]
object_euler = quat2euler(object_quat)
object_z_orientation = object_euler[2]
# Target data
# NOTE: Targets need to be set in the OptiTrack coordinate space for hardware
target_xyz = self._last_target_xyz.copy()
target_euler = self._last_target_euler.copy()
target_z_orientation = target_euler[2]
# TODO(justinvyu): Add an offset to the end of the Sawyer xyz
grasp_xyz = self.sim.data.get_site_xpos("grasp").copy()
if self.is_hardware:
grasp_xyz = obs_dict["mocap_pos"][:3]
relative_xyz = object_xyz - target_xyz
circle_dist = circle_distance(object_euler, target_euler)
hand_to_obj_dist = np.linalg.norm(grasp_xyz - object_xyz)
hand_to_target_dist = np.linalg.norm(grasp_xyz - target_xyz)
x, y, z = object_xyz
x_d, y_d = np.digitize(x, self.x_bins), np.digitize(y, self.y_bins)
hand_pose_dist = np.linalg.norm(obs_dict["dhand_qpos"] - self._pose_target)
circle_dist_mod_180 = np.array([
circle_distance_mod(object_z_orientation, target_z_orientation, mod=np.pi)
])
obs_dict.update(collections.OrderedDict((
# Object info
("object_xyz", object_xyz),
("object_xy_discrete", np.array([x_d, y_d])),
("object_quat", object_quat),
("object_euler", object_euler),
("object_z_orientation", object_z_orientation),
("object_z_orientation_cos", np.array([np.cos(object_z_orientation)])),
("object_z_orientation_sin", np.array([np.sin(object_z_orientation)])),
("object_qvel", object_qvel),
# Target info
("target_xyz", target_xyz),
("target_euler", target_euler),
("target_quat", euler2quat(target_euler)),
("target_z_orientation", target_z_orientation),
("target_z_orientation_cos", np.array([np.cos(target_z_orientation)])),
("target_z_orientation_sin", np.array([np.sin(target_z_orientation)])),
# Distances
("object_to_target_xyz_distance", np.linalg.norm(relative_xyz)),
("object_to_target_xy_distance", np.linalg.norm(relative_xyz[:2])),
("object_to_target_x_distance", np.abs(relative_xyz[0])),
("object_to_target_y_distance", np.abs(relative_xyz[1])),
("object_to_target_z_distance", np.abs(relative_xyz[2])),
("object_to_target_circle_distances", circle_dist),
("object_to_target_circle_distance", np.linalg.norm(circle_dist)),
("object_to_target_mod_120_circle_distance", np.array([0])),
("object_to_target_mod_180_circle_distance", circle_dist_mod_180),
("object_to_hand_xyz_distance", hand_to_obj_dist),
("target_to_hand_xyz_distance", hand_to_target_dist),
("pose_dist", hand_pose_dist),
# Relative vectors
("object_to_target_xyz", relative_xyz),
("object_to_hand_xyz", grasp_xyz - object_xyz),
("grasp_xyz", grasp_xyz),
)))
if self._wrapped_reset_policies:
obs_dict["reset_policy_xy_distance"] = self._reset_dist.copy()
# Add these as placeholders (to be populated for each object specifically)
# Valve specific observations
obs_dict["object_site1_to_target_site1_xyz_err"] = np.zeros(3)
obs_dict["object_site2_to_target_site2_xyz_err"] = np.zeros(3)
obs_dict["object_site3_to_target_site3_xyz_err"] = np.zeros(3)
obs_dict["object_top_angle_cos"] = np.array([0])
obs_dict["object_top_angle_sin"] = np.array([0])
obs_dict["object_top_angle"] = np.array([0])
obs_dict["target_top_angle_cos"] = np.array([0])
obs_dict["target_top_angle_sin"] = np.array([0])
obs_dict["target_top_angle"] = np.array([0])
# Rod/pipe specific observations
obs_dict["object_normal_to_target_normal_err"] = np.zeros(3)
obs_dict["object_parallel_to_target_parallel_err"] = np.zeros(3)
obs_dict["object_normal_to_target_normal_distance"] = np.zeros(1)
obs_dict["object_parallel_to_target_parallel_distance"] = np.zeros(1)
obs_dict['step_counter'] = np.array([0])
if not self.initializing and self.object_type == ObjectType.Valve3:
target1_xpos = self.sim.data.site_xpos[self.target_1].copy()
valve1_xpos = self.sim.data.site_xpos[self.valve_1].copy()
valve_dir = np.array(valve1_xpos - object_xyz)
# Project onto xy plane
valve_dir[2] = 0
# Unit vector in direction of the red prong
valve_dir /= np.linalg.norm(valve_dir)
target_dir = np.array(target1_xpos - target_xyz)
# Project onto xy plane
target_dir[2] = 0
# Unit vector in direction of the red prong
target_dir /= np.linalg.norm(target_dir)
object_angle = np.arctan2(valve_dir[1], valve_dir[0])
obs_dict["object_top_angle"] = np.array([object_angle])
obs_dict["object_top_angle_cos"] = np.array([valve_dir[0]])
obs_dict["object_top_angle_sin"] = np.array([valve_dir[1]])
target_angle = np.arctan2(target_dir[1], target_dir[0])
obs_dict["target_top_angle"] = np.array([target_angle])
obs_dict["target_top_angle_cos"] = np.array([target_dir[0]])
obs_dict["target_top_angle_sin"] = np.array([target_dir[1]])
obs_dict["object_to_target_mod_120_circle_distance"] = np.array([
circle_distance_mod(object_angle, target_angle, mod=(2 * np.pi / 3))
])
# Calculating site distances (of closest matching sites)
perm_1_dist = np.mean([
self.get_site_distance(self.target_1, self.valve_1),
self.get_site_distance(self.target_2, self.valve_2),
self.get_site_distance(self.target_3, self.valve_3)
])
perm_2_dist = np.mean([
self.get_site_distance(self.target_1, self.valve_2),
self.get_site_distance(self.target_2, self.valve_3),
self.get_site_distance(self.target_3, self.valve_1)
])
perm_3_dist = np.mean([
self.get_site_distance(self.target_1, self.valve_3),
self.get_site_distance(self.target_2, self.valve_1),
self.get_site_distance(self.target_3, self.valve_2)
])
if perm_1_dist <= perm_2_dist and perm_1_dist <= perm_3_dist:
# Use the first permutation because it is the closest
valve_1 = self.valve_1
valve_2 = self.valve_2
valve_3 = self.valve_3
object_to_target_dist = perm_1_dist
elif perm_2_dist <= perm_1_dist and perm_2_dist <= perm_3_dist:
# Use the 2nd
valve_1 = self.valve_2
valve_2 = self.valve_3
valve_3 = self.valve_1
object_to_target_dist = perm_2_dist
elif perm_3_dist <= perm_1_dist and perm_3_dist <= perm_2_dist:
# Use the 3rd
valve_1 = self.valve_3
valve_2 = self.valve_1
valve_3 = self.valve_2
object_to_target_dist = perm_3_dist
obs_dict["object_sites_to_target_sites_distance"] = object_to_target_dist
obs_dict["object_site1_to_target_site1_xyz_err"] = self.sim.data.site_xpos[self.target_1] - self.sim.data.site_xpos[valve_1]
obs_dict["object_site2_to_target_site2_xyz_err"] = self.sim.data.site_xpos[self.target_2] - self.sim.data.site_xpos[valve_2]
obs_dict["object_site3_to_target_site3_xyz_err"] = self.sim.data.site_xpos[self.target_3] - self.sim.data.site_xpos[valve_3]
if not self.initializing and self.object_type in (ObjectType.Rod, ObjectType.Pipe):
object_center = self.sim.data.site_xpos[self.rod_center]
target_center = self.sim.data.site_xpos[self.target_center]
object_normal = self.sim.data.site_xpos[self.rod_normal]
object_parallel = self.sim.data.site_xpos[self.rod_parallel]
target_normal = self.sim.data.site_xpos[self.target_normal]
target_parallel = self.sim.data.site_xpos[self.target_parallel]
object_normal_vec = obs_dict["object_normal_vector"] = object_normal - object_center
target_normal_vec = obs_dict["target_normal_vector"] = target_normal - target_center
object_parallel_vec = obs_dict["object_parallel_vector"] = object_parallel - object_center
target_parallel_vec = obs_dict["target_parallel_vector"] = target_parallel - target_center
normal_err = obs_dict["object_normal_to_target_normal_err"] = object_normal_vec - target_normal_vec
parallel_err = obs_dict["object_parallel_to_target_parallel_err"] = object_parallel_vec - target_parallel_vec
obs_dict["object_normal_to_target_normal_distance"] = np.linalg.norm(normal_err)
obs_dict["object_parallel_to_target_parallel_distance"] = np.linalg.norm(parallel_err)
return obs_dict