in lerobot/common/robots/lekiwi/lekiwi_host.py [0:0]
def main():
logging.info("Configuring LeKiwi")
robot_config = LeKiwiConfig()
robot = LeKiwi(robot_config)
logging.info("Connecting LeKiwi")
robot.connect()
logging.info("Starting HostAgent")
host_config = LeKiwiHostConfig()
host = LeKiwiHost(host_config)
last_cmd_time = time.time()
watchdog_active = False
logging.info("Waiting for commands...")
try:
# Business logic
start = time.perf_counter()
duration = 0
while duration < host.connection_time_s:
loop_start_time = time.time()
try:
msg = host.zmq_cmd_socket.recv_string(zmq.NOBLOCK)
data = dict(json.loads(msg))
_action_sent = robot.send_action(data)
last_cmd_time = time.time()
watchdog_active = False
except zmq.Again:
if not watchdog_active:
logging.warning("No command available")
except Exception as e:
logging.error("Message fetching failed: %s", e)
now = time.time()
if (now - last_cmd_time > host.watchdog_timeout_ms / 1000) and not watchdog_active:
logging.warning(
f"Command not received for more than {host.watchdog_timeout_ms} milliseconds. Stopping the base."
)
watchdog_active = True
robot.stop_base()
last_observation = robot.get_observation()
# Encode ndarrays to base64 strings
for cam_key, _ in robot.cameras.items():
ret, buffer = cv2.imencode(
".jpg", last_observation[cam_key], [int(cv2.IMWRITE_JPEG_QUALITY), 90]
)
if ret:
last_observation[cam_key] = base64.b64encode(buffer).decode("utf-8")
else:
last_observation[cam_key] = ""
# Send the observation to the remote agent
try:
host.zmq_observation_socket.send_string(json.dumps(last_observation), flags=zmq.NOBLOCK)
except zmq.Again:
logging.info("Dropping observation, no client connected")
# Ensure a short sleep to avoid overloading the CPU.
elapsed = time.time() - loop_start_time
time.sleep(max(1 / host.max_loop_freq_hz - elapsed, 0))
duration = time.perf_counter() - start
print("Cycle time reached.")
except KeyboardInterrupt:
print("Keyboard interrupt received. Exiting...")
finally:
print("Shutting down Lekiwi Host.")
robot.disconnect()
host.disconnect()
logging.info("Finished LeKiwi cleanly")