gym3/video_recorder.py (82 lines of code) (raw):

import os from typing import Any, Mapping, Optional import imageio import numpy as np from gym3.env import Env from gym3.internal.renderer import Renderer from gym3.wrapper import Wrapper class VideoRecorderWrapper(Wrapper): """ Record observations of each episode from an environment to a video file Subclasses may want to override `_process_frame` :param env: environment to record from :param directory: directory to save videos to, will be created if it does not exist :param env_index: the index of the environment to record :param ob_key: by default the observation is recorded for the video, if the observation is a dictionary, you can specify which key to record using this argument :param info_key: if the frame you want to record is in the environment info dictionary, specify the key here, e.g. "rgb" :param prefix: filename prefix to use when creating videos :param fps: fps to give to encoder, this depends on your environment and the resulting video will play back too quickly or too slowly depending on this value :param writer_kwargs: extra arguments to supply to the imageio writer :param render: if set to True, also show the current frame being recorded in a window """ def __init__( self, env: Env, directory: str, env_index: int = 0, ob_key: Optional[str] = None, info_key: Optional[str] = None, prefix: str = "", fps: int = 15, writer_kwargs: Optional[Mapping[str, Any]] = None, render=False, ) -> None: super().__init__(env=env) if info_key is not None: assert ob_key is None, "can't specify both info_key and ob_key" self._prefix = prefix self._directory = os.path.abspath(directory) os.makedirs(self._directory, exist_ok=True) self._ob_key = ob_key self._info_key = info_key self._env_index = env_index self._episode_count = 0 self._writer = None if writer_kwargs is None: writer_kwargs = {"output_params": ["-f", "mp4"]} self._writer_kwargs = writer_kwargs self._fps = fps self.videopath = None self._first_step = True self._renderer = Renderer(width=768, height=768) if render else None def _restart_recording(self) -> None: if self._writer is not None: self._writer.close() self.videopath = os.path.join( self._directory, f"{self._prefix}{self._episode_count:05d}.mp4" ) self._writer = imageio.get_writer( self.videopath, format="ffmpeg", fps=self._fps, **self._writer_kwargs ) def _append_observation(self) -> None: _, ob, _ = self.observe() if self._info_key is None: if self._ob_key is not None: ob = ob[self._ob_key] img = ob[self._env_index] else: info = self.get_info() img = info[self._env_index].get(self._info_key) # the first info for a converted environment may be empty if self._first_step and img is None: return frame = self._process_frame(img.astype(np.uint8)) self._writer.append_data(frame) if self._renderer is not None: self._renderer.start() self._renderer.draw_bitmap( 0, 0, self._renderer.width, self._renderer.height, image=frame ) self._renderer.finish() def _process_frame(self, frame: np.ndarray) -> np.ndarray: return frame def act(self, ac: Any) -> None: if self._first_step: # first action of the episode, get the existing observation before # taking an action self._restart_recording() self._append_observation() super().act(ac) self._first_step = False _, _, first = self.observe() if first[self._env_index]: self._episode_count += 1 self._first_step = True self._writer.close() self._writer = None else: self._append_observation()