def get_obs_dict()

in MTRF/r3l/r3l/r3l_envs/inhand_env/base.py [0:0]


    def get_obs_dict(self) -> Dict[str, np.ndarray]:
        obs_dict = super().get_obs_dict()
        # Object data
        object_qpos, object_qvel = obs_dict["object_qpos"], obs_dict["object_qvel"]
        object_xyz, object_quat = object_qpos[:3], object_qpos[3:]
        object_euler = quat2euler(object_quat)
        object_z_orientation = object_euler[2]

        # Target data
        # NOTE: Targets need to be set in the OptiTrack coordinate space for hardware
        target_xyz = self._last_target_xyz.copy()
        target_euler = self._last_target_euler.copy()
        target_z_orientation = target_euler[2]

        # TODO(justinvyu): Add an offset to the end of the Sawyer xyz
        grasp_xyz = self.sim.data.get_site_xpos("grasp").copy()
        if self.is_hardware:
            grasp_xyz = obs_dict["mocap_pos"][:3]

        relative_xyz = object_xyz - target_xyz
        circle_dist = circle_distance(object_euler, target_euler)
        hand_to_obj_dist = np.linalg.norm(grasp_xyz - object_xyz)
        hand_to_target_dist = np.linalg.norm(grasp_xyz - target_xyz)

        x, y, z = object_xyz
        x_d, y_d = np.digitize(x, self.x_bins), np.digitize(y, self.y_bins)

        hand_pose_dist = np.linalg.norm(obs_dict["dhand_qpos"] - self._pose_target)

        circle_dist_mod_180 = np.array([
            circle_distance_mod(object_z_orientation, target_z_orientation, mod=np.pi)
        ])

        obs_dict.update(collections.OrderedDict((
            # Object info
            ("object_xyz", object_xyz),
            ("object_xy_discrete", np.array([x_d, y_d])),
            ("object_quat", object_quat),
            ("object_euler", object_euler),
            ("object_z_orientation", object_z_orientation),
            ("object_z_orientation_cos", np.array([np.cos(object_z_orientation)])),
            ("object_z_orientation_sin", np.array([np.sin(object_z_orientation)])),
            ("object_qvel", object_qvel),

            # Target info
            ("target_xyz", target_xyz),
            ("target_euler", target_euler),
            ("target_quat", euler2quat(target_euler)),
            ("target_z_orientation", target_z_orientation),
            ("target_z_orientation_cos", np.array([np.cos(target_z_orientation)])),
            ("target_z_orientation_sin", np.array([np.sin(target_z_orientation)])),

            # Distances
            ("object_to_target_xyz_distance", np.linalg.norm(relative_xyz)),
            ("object_to_target_xy_distance", np.linalg.norm(relative_xyz[:2])),
            ("object_to_target_x_distance", np.abs(relative_xyz[0])),
            ("object_to_target_y_distance", np.abs(relative_xyz[1])),
            ("object_to_target_z_distance", np.abs(relative_xyz[2])),
            ("object_to_target_circle_distances", circle_dist),
            ("object_to_target_circle_distance", np.linalg.norm(circle_dist)),
            ("object_to_target_mod_120_circle_distance", np.array([0])),
            ("object_to_target_mod_180_circle_distance", circle_dist_mod_180),
            ("object_to_hand_xyz_distance", hand_to_obj_dist),
            ("target_to_hand_xyz_distance", hand_to_target_dist),
            ("pose_dist", hand_pose_dist),

            # Relative vectors
            ("object_to_target_xyz", relative_xyz),
            ("object_to_hand_xyz", grasp_xyz - object_xyz),
            ("grasp_xyz", grasp_xyz),
        )))
        if self._wrapped_reset_policies:
            obs_dict["reset_policy_xy_distance"] = self._reset_dist.copy()

        # Add these as placeholders (to be populated for each object specifically)

        # Valve specific observations
        obs_dict["object_site1_to_target_site1_xyz_err"] = np.zeros(3)
        obs_dict["object_site2_to_target_site2_xyz_err"] = np.zeros(3)
        obs_dict["object_site3_to_target_site3_xyz_err"] = np.zeros(3)
        obs_dict["object_top_angle_cos"] = np.array([0])
        obs_dict["object_top_angle_sin"] = np.array([0])
        obs_dict["object_top_angle"] = np.array([0])
        obs_dict["target_top_angle_cos"] = np.array([0])
        obs_dict["target_top_angle_sin"] = np.array([0])
        obs_dict["target_top_angle"] = np.array([0])

        # Rod/pipe specific observations
        obs_dict["object_normal_to_target_normal_err"] = np.zeros(3)
        obs_dict["object_parallel_to_target_parallel_err"] = np.zeros(3)
        obs_dict["object_normal_to_target_normal_distance"] = np.zeros(1)
        obs_dict["object_parallel_to_target_parallel_distance"] = np.zeros(1)

        obs_dict['step_counter'] = np.array([0])

        if not self.initializing and self.object_type == ObjectType.Valve3:
            target1_xpos = self.sim.data.site_xpos[self.target_1].copy()
            valve1_xpos = self.sim.data.site_xpos[self.valve_1].copy()

            valve_dir = np.array(valve1_xpos - object_xyz)
            # Project onto xy plane
            valve_dir[2] = 0
            # Unit vector in direction of the red prong
            valve_dir /= np.linalg.norm(valve_dir)

            target_dir = np.array(target1_xpos - target_xyz)
            # Project onto xy plane
            target_dir[2] = 0
            # Unit vector in direction of the red prong
            target_dir /= np.linalg.norm(target_dir)

            object_angle = np.arctan2(valve_dir[1], valve_dir[0])
            obs_dict["object_top_angle"] = np.array([object_angle])
            obs_dict["object_top_angle_cos"] = np.array([valve_dir[0]])
            obs_dict["object_top_angle_sin"] = np.array([valve_dir[1]])
            target_angle = np.arctan2(target_dir[1], target_dir[0])
            obs_dict["target_top_angle"] = np.array([target_angle])
            obs_dict["target_top_angle_cos"] = np.array([target_dir[0]])
            obs_dict["target_top_angle_sin"] = np.array([target_dir[1]])

            obs_dict["object_to_target_mod_120_circle_distance"] = np.array([
                circle_distance_mod(object_angle, target_angle, mod=(2 * np.pi / 3))
            ])

            # Calculating site distances (of closest matching sites)
            perm_1_dist = np.mean([
                self.get_site_distance(self.target_1, self.valve_1),
                self.get_site_distance(self.target_2, self.valve_2),
                self.get_site_distance(self.target_3, self.valve_3)
            ])
            perm_2_dist = np.mean([
                self.get_site_distance(self.target_1, self.valve_2),
                self.get_site_distance(self.target_2, self.valve_3),
                self.get_site_distance(self.target_3, self.valve_1)
            ])
            perm_3_dist = np.mean([
                self.get_site_distance(self.target_1, self.valve_3),
                self.get_site_distance(self.target_2, self.valve_1),
                self.get_site_distance(self.target_3, self.valve_2)
            ])

            if perm_1_dist <= perm_2_dist and perm_1_dist <= perm_3_dist:
                # Use the first permutation because it is the closest
                valve_1 = self.valve_1
                valve_2 = self.valve_2
                valve_3 = self.valve_3
                object_to_target_dist = perm_1_dist
            elif perm_2_dist <= perm_1_dist and perm_2_dist <= perm_3_dist:
                # Use the 2nd
                valve_1 = self.valve_2
                valve_2 = self.valve_3
                valve_3 = self.valve_1
                object_to_target_dist = perm_2_dist
            elif perm_3_dist <= perm_1_dist and perm_3_dist <= perm_2_dist:
                # Use the 3rd
                valve_1 = self.valve_3
                valve_2 = self.valve_1
                valve_3 = self.valve_2
                object_to_target_dist = perm_3_dist

            obs_dict["object_sites_to_target_sites_distance"] = object_to_target_dist
            obs_dict["object_site1_to_target_site1_xyz_err"] = self.sim.data.site_xpos[self.target_1] - self.sim.data.site_xpos[valve_1]
            obs_dict["object_site2_to_target_site2_xyz_err"] = self.sim.data.site_xpos[self.target_2] - self.sim.data.site_xpos[valve_2]
            obs_dict["object_site3_to_target_site3_xyz_err"] = self.sim.data.site_xpos[self.target_3] - self.sim.data.site_xpos[valve_3]

        if not self.initializing and self.object_type in (ObjectType.Rod, ObjectType.Pipe):
            object_center = self.sim.data.site_xpos[self.rod_center]
            target_center = self.sim.data.site_xpos[self.target_center]
            object_normal = self.sim.data.site_xpos[self.rod_normal]
            object_parallel = self.sim.data.site_xpos[self.rod_parallel]
            target_normal = self.sim.data.site_xpos[self.target_normal]
            target_parallel = self.sim.data.site_xpos[self.target_parallel]

            object_normal_vec = obs_dict["object_normal_vector"] = object_normal - object_center
            target_normal_vec = obs_dict["target_normal_vector"] = target_normal - target_center
            object_parallel_vec = obs_dict["object_parallel_vector"] = object_parallel - object_center
            target_parallel_vec = obs_dict["target_parallel_vector"] = target_parallel - target_center

            normal_err = obs_dict["object_normal_to_target_normal_err"] = object_normal_vec - target_normal_vec
            parallel_err = obs_dict["object_parallel_to_target_parallel_err"] = object_parallel_vec - target_parallel_vec
            obs_dict["object_normal_to_target_normal_distance"] = np.linalg.norm(normal_err)
            obs_dict["object_parallel_to_target_parallel_distance"] = np.linalg.norm(parallel_err)

        return obs_dict