mujoco_py/mjrendercontext.pyx (249 lines of code) (raw):

from threading import Lock from mujoco_py.generated import const import numpy as np cimport numpy as np cdef class MjRenderContext(object): """ Class that encapsulates rendering functionality for a MuJoCo simulation. """ cdef mjModel *_model_ptr cdef mjData *_data_ptr cdef mjvScene _scn cdef mjvCamera _cam cdef mjvOption _vopt cdef mjvPerturb _pert cdef mjrContext _con # Public wrappers cdef readonly PyMjvScene scn cdef readonly PyMjvCamera cam cdef readonly PyMjvOption vopt cdef readonly PyMjvPerturb pert cdef readonly PyMjrContext con cdef readonly object opengl_context cdef readonly int _visible cdef readonly list _markers cdef readonly dict _overlay cdef readonly bint offscreen cdef public object sim def __cinit__(self): maxgeom = 1000 mjv_makeScene(self._model_ptr, &self._scn, maxgeom) mjv_defaultCamera(&self._cam) mjv_defaultPerturb(&self._pert) mjv_defaultOption(&self._vopt) mjr_defaultContext(&self._con) def __init__(self, MjSim sim, bint offscreen=True, int device_id=-1, opengl_backend=None, quiet=False): self.sim = sim self._setup_opengl_context(offscreen, device_id, opengl_backend, quiet=quiet) self.offscreen = offscreen # Ensure the model data has been updated so that there # is something to render sim.forward() sim.add_render_context(self) self._model_ptr = sim.model.ptr self._data_ptr = sim.data.ptr self.scn = WrapMjvScene(&self._scn) self.cam = WrapMjvCamera(&self._cam) self.vopt = WrapMjvOption(&self._vopt) self.con = WrapMjrContext(&self._con) self._pert.active = 0 self._pert.select = 0 self._pert.skinselect = -1 self.pert = WrapMjvPerturb(&self._pert) self._markers = [] self._overlay = {} self._init_camera(sim) self._set_mujoco_buffers() def update_sim(self, MjSim new_sim): if new_sim == self.sim: return self._model_ptr = new_sim.model.ptr self._data_ptr = new_sim.data.ptr self._set_mujoco_buffers() for render_context in self.sim.render_contexts: new_sim.add_render_context(render_context) self.sim = new_sim def _set_mujoco_buffers(self): mjr_makeContext(self._model_ptr, &self._con, mjFONTSCALE_150) if self.offscreen: mjr_setBuffer(mjFB_OFFSCREEN, &self._con); if self._con.currentBuffer != mjFB_OFFSCREEN: raise RuntimeError('Offscreen rendering not supported') else: mjr_setBuffer(mjFB_WINDOW, &self._con); if self._con.currentBuffer != mjFB_WINDOW: raise RuntimeError('Window rendering not supported') self.con = WrapMjrContext(&self._con) def _setup_opengl_context(self, offscreen, device_id, opengl_backend, quiet=False): if opengl_backend is None and (not offscreen or sys.platform == 'darwin'): # default to glfw for onscreen viewing or mac (both offscreen/onscreen) opengl_backend = 'glfw' if opengl_backend == 'glfw': self.opengl_context = GlfwContext(offscreen=offscreen, quiet=quiet) else: if device_id < 0: if "GPUS" in os.environ: device_id = os.environ["GPUS"] else: device_id = os.getenv('CUDA_VISIBLE_DEVICES', '') if len(device_id) > 0: device_id = int(device_id.split(',')[0]) else: # Sometimes env variable is an empty string. device_id = 0 self.opengl_context = OffscreenOpenGLContext(device_id) def _init_camera(self, sim): # Make the free camera look at the scene self.cam.type = const.CAMERA_FREE self.cam.fixedcamid = -1 for i in range(3): self.cam.lookat[i] = np.median(sim.data.geom_xpos[:, i]) self.cam.distance = sim.model.stat.extent def update_offscreen_size(self, width, height): if width != self._con.offWidth or height != self._con.offHeight: self._model_ptr.vis.global_.offwidth = width self._model_ptr.vis.global_.offheight = height mjr_freeContext(&self._con) self._set_mujoco_buffers() def render(self, width, height, camera_id=None, segmentation=False): cdef mjrRect rect rect.left = 0 rect.bottom = 0 rect.width = width rect.height = height if self.sim.render_callback is not None: self.sim.render_callback(self.sim, self) # Sometimes buffers are too small. if width > self._con.offWidth or height > self._con.offHeight: new_width = max(width, self._model_ptr.vis.global_.offwidth) new_height = max(height, self._model_ptr.vis.global_.offheight) self.update_offscreen_size(new_width, new_height) if camera_id is not None: if camera_id == -1: self.cam.type = const.CAMERA_FREE else: self.cam.type = const.CAMERA_FIXED self.cam.fixedcamid = camera_id # This doesn't really do anything else rather than checking for the size of buffer # need to investigate further whi is that a no-op # self.opengl_context.set_buffer_size(width, height) mjv_updateScene(self._model_ptr, self._data_ptr, &self._vopt, &self._pert, &self._cam, mjCAT_ALL, &self._scn) if segmentation: self._scn.flags[const.RND_SEGMENT] = 1 self._scn.flags[const.RND_IDCOLOR] = 1 for marker_params in self._markers: self._add_marker_to_scene(marker_params) mjr_render(rect, &self._scn, &self._con) for gridpos, (text1, text2) in self._overlay.items(): mjr_overlay(const.FONTSCALE_150, gridpos, rect, text1.encode(), text2.encode(), &self._con) if segmentation: self._scn.flags[const.RND_SEGMENT] = 0 self._scn.flags[const.RND_IDCOLOR] = 0 def read_pixels(self, width, height, depth=True, segmentation=False): cdef mjrRect rect rect.left = 0 rect.bottom = 0 rect.width = width rect.height = height rgb_arr = np.zeros(3 * rect.width * rect.height, dtype=np.uint8) depth_arr = np.zeros(rect.width * rect.height, dtype=np.float32) cdef unsigned char[::view.contiguous] rgb_view = rgb_arr cdef float[::view.contiguous] depth_view = depth_arr mjr_readPixels(&rgb_view[0], &depth_view[0], rect, &self._con) rgb_img = rgb_arr.reshape(rect.height, rect.width, 3) cdef np.ndarray[np.npy_uint32, ndim=2] seg_img cdef np.ndarray[np.npy_int32, ndim=2] seg_ids ret_img = rgb_img if segmentation: seg_img = (rgb_img[:, :, 0] + rgb_img[:, :, 1] * (2**8) + rgb_img[:, :, 2] * (2 ** 16)) seg_img[seg_img >= (self._scn.ngeom + 1)] = 0 seg_ids = np.full((self._scn.ngeom + 1, 2), fill_value=-1, dtype=np.int32) for i in range(self._scn.ngeom): geom = self._scn.geoms[i] if geom.segid != -1: seg_ids[geom.segid + 1, 0] = geom.objtype seg_ids[geom.segid + 1, 1] = geom.objid ret_img = seg_ids[seg_img] if depth: depth_img = depth_arr.reshape(rect.height, rect.width) return (ret_img, depth_img) else: return ret_img def read_pixels_depth(self, np.ndarray[np.float32_t, mode="c", ndim=2] buffer): ''' Read depth pixels into a preallocated buffer ''' cdef mjrRect rect rect.left = 0 rect.bottom = 0 rect.width = buffer.shape[1] rect.height = buffer.shape[0] cdef float[::view.contiguous] buffer_view = buffer.ravel() mjr_readPixels(NULL, &buffer_view[0], rect, &self._con) def upload_texture(self, int tex_id): """ Uploads given texture to the GPU. """ self.opengl_context.make_context_current() mjr_uploadTexture(self._model_ptr, &self._con, tex_id) def draw_pixels(self, np.ndarray[np.uint8_t, ndim=3] image, int left, int bottom): """Draw an image into the OpenGL buffer.""" cdef unsigned char[::view.contiguous] image_view = image.ravel() cdef mjrRect viewport viewport.left = left viewport.bottom = bottom viewport.width = image.shape[1] viewport.height = image.shape[0] mjr_drawPixels(&image_view[0], NULL, viewport, &self._con) def move_camera(self, int action, double reldx, double reldy): """ Moves the camera based on mouse movements. Action is one of mjMOUSE_*. """ mjv_moveCamera(self._model_ptr, action, reldx, reldy, &self._scn, &self._cam) def add_overlay(self, int gridpos, str text1, str text2): """ Overlays text on the scene. """ if gridpos not in self._overlay: self._overlay[gridpos] = ["", ""] self._overlay[gridpos][0] += text1 + "\n" self._overlay[gridpos][1] += text2 + "\n" def add_marker(self, **marker_params): self._markers.append(marker_params) def _add_marker_to_scene(self, marker_params): """ Adds marker to scene, and returns the corresponding object. """ if self._scn.ngeom >= self._scn.maxgeom: raise RuntimeError('Ran out of geoms. maxgeom: %d' % self._scn.maxgeom) cdef mjvGeom *g = self._scn.geoms + self._scn.ngeom # default values. g.dataid = -1 g.objtype = const.OBJ_UNKNOWN g.objid = -1 g.category = const.CAT_DECOR g.texid = -1 g.texuniform = 0 g.texrepeat[0] = 1 g.texrepeat[1] = 1 g.emission = 0 g.specular = 0.5 g.shininess = 0.5 g.reflectance = 0 g.type = const.GEOM_BOX g.size[:] = np.ones(3) * 0.1 g.mat[:] = np.eye(3).flatten() g.rgba[:] = np.ones(4) wrapped = WrapMjvGeom(g) for key, value in marker_params.items(): if isinstance(value, (int, float)): setattr(wrapped, key, value) elif isinstance(value, (tuple, list, np.ndarray)): attr = getattr(wrapped, key) attr[:] = np.asarray(value).reshape(attr.shape) elif isinstance(value, str): assert key == "label", "Only label is a string in mjvGeom." if value == None: g.label[0] = 0 else: strncpy(g.label, value.encode(), 100) elif hasattr(wrapped, key): raise ValueError("mjvGeom has attr {} but type {} is invalid".format(key, type(value))) else: raise ValueError("mjvGeom doesn't have field %s" % key) self._scn.ngeom += 1 def __dealloc__(self): mjr_freeContext(&self._con) mjv_freeScene(&self._scn) class MjRenderContextOffscreen(MjRenderContext): def __cinit__(self, MjSim sim, int device_id): super().__init__(sim, offscreen=True, device_id=device_id) class MjRenderContextWindow(MjRenderContext): def __init__(self, MjSim sim): super().__init__(sim, offscreen=False) self.render_swap_callback = None assert isinstance(self.opengl_context, GlfwContext), ( "Only GlfwContext supported for windowed rendering") @property def window(self): return self.opengl_context.window def render(self): if self.window is None or glfw.window_should_close(self.window): return glfw.make_context_current(self.window) super().render(*glfw.get_framebuffer_size(self.window)) if self.render_swap_callback is not None: self.render_swap_callback() glfw.swap_buffers(self.window)