lerobot/common/robots/lekiwi/lekiwi_host.py (85 lines of code) (raw):

#!/usr/bin/env python # Copyright 2024 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import base64 import json import logging import time import cv2 import zmq from .config_lekiwi import LeKiwiConfig, LeKiwiHostConfig from .lekiwi import LeKiwi class LeKiwiHost: def __init__(self, config: LeKiwiHostConfig): self.zmq_context = zmq.Context() self.zmq_cmd_socket = self.zmq_context.socket(zmq.PULL) self.zmq_cmd_socket.setsockopt(zmq.CONFLATE, 1) self.zmq_cmd_socket.bind(f"tcp://*:{config.port_zmq_cmd}") self.zmq_observation_socket = self.zmq_context.socket(zmq.PUSH) self.zmq_observation_socket.setsockopt(zmq.CONFLATE, 1) self.zmq_observation_socket.bind(f"tcp://*:{config.port_zmq_observations}") self.connection_time_s = config.connection_time_s self.watchdog_timeout_ms = config.watchdog_timeout_ms self.max_loop_freq_hz = config.max_loop_freq_hz def disconnect(self): self.zmq_observation_socket.close() self.zmq_cmd_socket.close() self.zmq_context.term() def main(): logging.info("Configuring LeKiwi") robot_config = LeKiwiConfig() robot = LeKiwi(robot_config) logging.info("Connecting LeKiwi") robot.connect() logging.info("Starting HostAgent") host_config = LeKiwiHostConfig() host = LeKiwiHost(host_config) last_cmd_time = time.time() watchdog_active = False logging.info("Waiting for commands...") try: # Business logic start = time.perf_counter() duration = 0 while duration < host.connection_time_s: loop_start_time = time.time() try: msg = host.zmq_cmd_socket.recv_string(zmq.NOBLOCK) data = dict(json.loads(msg)) _action_sent = robot.send_action(data) last_cmd_time = time.time() watchdog_active = False except zmq.Again: if not watchdog_active: logging.warning("No command available") except Exception as e: logging.error("Message fetching failed: %s", e) now = time.time() if (now - last_cmd_time > host.watchdog_timeout_ms / 1000) and not watchdog_active: logging.warning( f"Command not received for more than {host.watchdog_timeout_ms} milliseconds. Stopping the base." ) watchdog_active = True robot.stop_base() last_observation = robot.get_observation() # Encode ndarrays to base64 strings for cam_key, _ in robot.cameras.items(): ret, buffer = cv2.imencode( ".jpg", last_observation[cam_key], [int(cv2.IMWRITE_JPEG_QUALITY), 90] ) if ret: last_observation[cam_key] = base64.b64encode(buffer).decode("utf-8") else: last_observation[cam_key] = "" # Send the observation to the remote agent try: host.zmq_observation_socket.send_string(json.dumps(last_observation), flags=zmq.NOBLOCK) except zmq.Again: logging.info("Dropping observation, no client connected") # Ensure a short sleep to avoid overloading the CPU. elapsed = time.time() - loop_start_time time.sleep(max(1 / host.max_loop_freq_hz - elapsed, 0)) duration = time.perf_counter() - start print("Cycle time reached.") except KeyboardInterrupt: print("Keyboard interrupt received. Exiting...") finally: print("Shutting down Lekiwi Host.") robot.disconnect() host.disconnect() logging.info("Finished LeKiwi cleanly") if __name__ == "__main__": main()