mujoco_py/mjsim.pyx (249 lines of code) (raw):

from xml.dom import minidom from mujoco_py.utils import remove_empty_lines from mujoco_py.builder import build_callback_fn from threading import Lock _MjSim_render_lock = Lock() ctypedef void (*substep_udd_t)(const mjModel* m, mjData* d) cdef class MjSim(object): """MjSim represents a running simulation including its state. Similar to Gym's ``MujocoEnv``, it internally wraps a :class:`.PyMjModel` and a :class:`.PyMjData`. Parameters ---------- model : :class:`.PyMjModel` The model to simulate. data : :class:`.PyMjData` Optional container for the simulation state. Will be created if ``None``. nsubsteps : int Optional number of MuJoCo steps to run for every call to :meth:`.step`. Buffers will be swapped only once per step. udd_callback : fn(:class:`.MjSim`) -> dict Optional callback for user-defined dynamics. At every call to :meth:`.step`, it receives an MjSim object ``sim`` containing the current user-defined dynamics state in ``sim.udd_state``, and returns the next ``udd_state`` after applying the user-defined dynamics. This is useful e.g. for reward functions that operate over functions of historical state. substep_callback : str or int or None This uses a compiled C function as user-defined dynamics in substeps. If given as a string, it's compiled as a C function and set as pointer. If given as int, it's interpreted as a function pointer. See :meth:`.set_substep_callback` for detailed info. userdata_names : list of strings or None This is a convenience parameter which is just set on the model. Equivalent to calling ``model.set_userdata_names`` render_callback : callback for rendering. """ # MjRenderContext for rendering camera views. cdef readonly list render_contexts cdef readonly object _render_context_window cdef readonly object _render_context_offscreen # MuJoCo model cdef readonly PyMjModel model # MuJoCo data """ DATAZ """ cdef readonly PyMjData data # Number of substeps when calling .step cdef public int nsubsteps # User defined state. cdef public dict udd_state # User defined dynamics callback cdef readonly object _udd_callback # Allows to store extra information in MjSim. cdef readonly dict extras # Function pointer for substep callback, stored as uintptr cdef readonly uintptr_t substep_callback_ptr # Callback executed before rendering. cdef public object render_callback def __cinit__(self, PyMjModel model, PyMjData data=None, int nsubsteps=1, udd_callback=None, substep_callback=None, userdata_names=None, render_callback=None): self.nsubsteps = nsubsteps self.model = model if data is None: with wrap_mujoco_warning(): _data = mj_makeData(self.model.ptr) if _data == NULL: raise Exception('mj_makeData failed!') self.data = WrapMjData(_data, self.model) else: self.data = data self.render_contexts = [] self._render_context_offscreen = None self._render_context_window = None self.udd_state = None self.udd_callback = udd_callback self.render_callback = render_callback self.extras = {} self.set_substep_callback(substep_callback, userdata_names) def reset(self): """ Resets the simulation data and clears buffers. """ with wrap_mujoco_warning(): mj_resetData(self.model.ptr, self.data.ptr) self.udd_state = None self.step_udd() def forward(self): """ Computes the forward kinematics. Calls ``mj_forward`` internally. """ with wrap_mujoco_warning(): mj_forward(self.model.ptr, self.data.ptr) def set_constants(self): """ Set constant fields of mjModel, corresponding to qpos0 configuration. """ with wrap_mujoco_warning(): mj_setConst(self.model.ptr, self.data.ptr) def step(self, with_udd=True): """ Advances the simulation by calling ``mj_step``. If ``qpos`` or ``qvel`` have been modified directly, the user is required to call :meth:`.forward` before :meth:`.step` if their ``udd_callback`` requires access to MuJoCo state set during the forward dynamics. """ if with_udd: self.step_udd() with wrap_mujoco_warning(): for _ in range(self.nsubsteps): self.substep_callback() mj_step(self.model.ptr, self.data.ptr) def render(self, width=None, height=None, *, camera_name=None, depth=False, mode='offscreen', device_id=-1, segmentation=False): """ Renders view from a camera and returns image as an `numpy.ndarray`. Args: - width (int): desired image width. - height (int): desired image height. - camera_name (str): name of camera in model. If None, the free camera will be used. - depth (bool): if True, also return depth buffer - device (int): device to use for rendering (only for GPU-backed rendering). Returns: - rgb (uint8 array): image buffer from camera - depth (float array): depth buffer from camera (only returned if depth=True) """ if camera_name is None: camera_id = None else: camera_id = self.model.camera_name2id(camera_name) if mode == 'offscreen': with _MjSim_render_lock: if self._render_context_offscreen is None: render_context = MjRenderContextOffscreen( self, device_id=device_id) else: render_context = self._render_context_offscreen render_context.render( width=width, height=height, camera_id=camera_id, segmentation=segmentation) return render_context.read_pixels( width, height, depth=depth, segmentation=segmentation) elif mode == 'window': if self._render_context_window is None: from mujoco_py.mjviewer import MjViewer render_context = MjViewer(self) else: render_context = self._render_context_window render_context.render() else: raise ValueError("Mode must be either 'window' or 'offscreen'.") def add_render_context(self, render_context): self.render_contexts.append(render_context) if render_context.offscreen and self._render_context_offscreen is None: self._render_context_offscreen = render_context elif not render_context.offscreen and self._render_context_window is None: self._render_context_window = render_context @property def udd_callback(self): return self._udd_callback @udd_callback.setter def udd_callback(self, value): self._udd_callback = value self.udd_state = None self.step_udd() cpdef substep_callback(self): if self.substep_callback_ptr: (<mjfGeneric>self.substep_callback_ptr)(self.model.ptr, self.data.ptr) def set_substep_callback(self, substep_callback, userdata_names=None): ''' Set a substep callback function. Parameters : substep_callback : str or int or None If `substep_callback` is a string, compile to function pointer and set. See `builder.build_callback_fn()` for documentation. If `substep_callback` is an int, we interpret it as a function pointer. If `substep_callback` is None, we disable substep_callbacks. userdata_names : list of strings or None This is a convenience parameter, if not None, this is passed onto ``model.set_userdata_names()``. ''' if userdata_names is not None: self.model.set_userdata_names(userdata_names) if substep_callback is None: self.substep_callback_ptr = 0 elif isinstance(substep_callback, int): self.substep_callback_ptr = substep_callback elif isinstance(substep_callback, str): self.substep_callback_ptr = build_callback_fn(substep_callback, self.model.userdata_names) else: raise TypeError('invalid: {}'.format(type(substep_callback))) def step_udd(self): if self._udd_callback is None: self.udd_state = {} else: schema_example = self.udd_state self.udd_state = self._udd_callback(self) # Check to make sure the udd_state has consistent keys and dimension across steps if schema_example is not None: keys = set(schema_example.keys()) | set(self.udd_state.keys()) for key in keys: assert key in schema_example, "Keys cannot be added to udd_state between steps." assert key in self.udd_state, "Keys cannot be dropped from udd_state between steps." if isinstance(schema_example[key], Number): assert isinstance(self.udd_state[key], Number), \ "Every value in udd_state must be either a number or a numpy array" else: assert isinstance(self.udd_state[key], np.ndarray), \ "Every value in udd_state must be either a number or a numpy array" assert self.udd_state[key].shape == schema_example[key].shape, \ "Numpy array values in udd_state must keep the same dimension across steps." def get_state(self): """ Returns a copy of the simulator state. """ qpos = np.copy(self.data.qpos) qvel = np.copy(self.data.qvel) if self.model.na == 0: act = None else: act = np.copy(self.data.act) udd_state = copy.deepcopy(self.udd_state) return MjSimState(self.data.time, qpos, qvel, act, udd_state) def set_state(self, value): """ Sets the state from an MjSimState. If the MjSimState was previously unflattened from a numpy array, consider set_state_from_flattened, as the defensive copy is a substantial overhead in an inner loop. Args: - value (MjSimState): the desired state. - call_forward: optionally call sim.forward(). Called by default if the udd_callback is set. """ self.data.time = value.time self.data.qpos[:] = np.copy(value.qpos) self.data.qvel[:] = np.copy(value.qvel) if self.model.na != 0: self.data.act[:] = np.copy(value.act) self.udd_state = copy.deepcopy(value.udd_state) def set_state_from_flattened(self, value): """ This helper method sets the state from an array without requiring a defensive copy.""" state = MjSimState.from_flattened(value, self) self.data.time = state.time self.data.qpos[:] = state.qpos self.data.qvel[:] = state.qvel if self.model.na != 0: self.data.act[:] = state.act self.udd_state = state.udd_state def save(self, file, format='xml', keep_inertials=False): """ Saves the simulator model and state to a file as either a MuJoCo XML or MJB file. The current state is saved as a keyframe in the model file. This is useful for debugging using MuJoCo's `simulate` utility. Note that this doesn't save the UDD-state which is part of MjSimState, since that's not supported natively by MuJoCo. If you want to save the model together with the UDD-state, you should use the `get_xml` or `get_mjb` methods on `MjModel` together with `MjSim.get_state` and save them with e.g. pickle. Args: - file (IO stream): stream to write model to. - format: format to use (either 'xml' or 'mjb') - keep_inertials (bool): if False, removes all <inertial> properties derived automatically for geoms by MuJoco. Note that this removes ones that were provided by the user as well. """ xml_str = self.model.get_xml() dom = minidom.parseString(xml_str) mujoco_node = dom.childNodes[0] assert mujoco_node.tagName == 'mujoco' keyframe_el = dom.createElement('keyframe') key_el = dom.createElement('key') keyframe_el.appendChild(key_el) mujoco_node.appendChild(keyframe_el) def str_array(arr): return " ".join(map(str, arr)) key_el.setAttribute('time', str(self.data.time)) key_el.setAttribute('qpos', str_array(self.data.qpos)) key_el.setAttribute('qvel', str_array(self.data.qvel)) if self.data.act is not None: key_el.setAttribute('act', str_array(self.data.act)) if not keep_inertials: for element in dom.getElementsByTagName('inertial'): element.parentNode.removeChild(element) result_xml = remove_empty_lines(dom.toprettyxml(indent=" " * 4)) if format == 'xml': file.write(result_xml) elif format == 'mjb': new_model = load_model_from_xml(result_xml) file.write(new_model.get_mjb()) else: raise ValueError("Unsupported format. Valid ones are 'xml' and 'mjb'") def ray(self, pnt, vec, include_static_geoms=True, exclude_body=-1, group_filter=None): """ Cast a ray into the scene, and return the first valid geom it intersects. pnt - origin point of the ray in world coordinates (X Y Z) vec - direction of the ray in world coordinates (X Y Z) include_static_geoms - if False, we exclude geoms that are children of worldbody. exclude_body - if this is a body ID, we exclude all children geoms of this body. group_filter - a vector of booleans of length const.NGROUP which specifies what geom groups (stored in model.geom_group) to enable or disable. If none, all groups are used Returns (distance, geomid) where distance - distance along ray until first collision with geom geomid - id of the geom the ray collided with If no collision was found in the scene, return (-1, None) NOTE: sometimes self.forward() needs to be called before self.ray(). See self.ray_fast_group() and self.ray_fast_nogroup() for versions of this call with more stringent type requirements. """ cdef mjtNum distance cdef mjtNum[::view.contiguous] pnt_view = pnt cdef mjtNum[::view.contiguous] vec_view = vec if group_filter is None: return self.ray_fast_nogroup( np.asarray(pnt, dtype=np.float64), np.asarray(vec, dtype=np.float64), 1 if include_static_geoms else 0, exclude_body) else: return self.ray_fast_group( np.asarray(pnt, dtype=np.float64), np.asarray(vec, dtype=np.float64), np.asarray(group_filter, dtype=np.uint8), 1 if include_static_geoms else 0, exclude_body) def ray_fast_group(self, np.ndarray[np.float64_t, mode="c", ndim=1] pnt, np.ndarray[np.float64_t, mode="c", ndim=1] vec, np.ndarray[np.uint8_t, mode="c", ndim=1] geomgroup, mjtByte flg_static=1, int bodyexclude=-1): """ Faster version of sim.ray(), which avoids extra copies, but needs to be given all the correct type arrays. See self.ray() for explanation of arguments """ cdef int geomid cdef mjtNum distance cdef mjtNum[::view.contiguous] pnt_view = pnt cdef mjtNum[::view.contiguous] vec_view = vec cdef mjtByte[::view.contiguous] geomgroup_view = geomgroup distance = mj_ray(self.model.ptr, self.data.ptr, &pnt_view[0], &vec_view[0], &geomgroup_view[0], flg_static, bodyexclude, &geomid) return (distance, geomid) def ray_fast_nogroup(self, np.ndarray[np.float64_t, mode="c", ndim=1] pnt, np.ndarray[np.float64_t, mode="c", ndim=1] vec, mjtByte flg_static=1, int bodyexclude=-1): """ Faster version of sim.ray(), which avoids extra copies, but needs to be given all the correct type arrays. This version hardcodes the geomgroup to NULL. (Can't easily express a signature that is "numpy array of specific type or None") See self.ray() for explanation of arguments """ cdef int geomid cdef mjtNum distance cdef mjtNum[::view.contiguous] pnt_view = pnt cdef mjtNum[::view.contiguous] vec_view = vec distance = mj_ray(self.model.ptr, self.data.ptr, &pnt_view[0], &vec_view[0], NULL, flg_static, bodyexclude, &geomid) return (distance, geomid)