in polymetis/python/polysim/grpc_sim_client.py [0:0]
def run(self, time_horizon=float("inf")):
"""Start running the simulation and querying the server.
Args:
time_horizon: If finite, the number of timesteps to stop the simulation.
"""
msg = self.connection.InitRobotClient(self.metadata.get_proto())
robot_state = polymetis_pb2.RobotState()
# Main loop
t = 0
spinner = Spinner(self.hz)
while t < time_horizon:
# Get robot state from env
joint_pos, joint_vel = self.env.get_current_joint_pos_vel()
robot_state.joint_positions[:] = joint_pos
robot_state.joint_velocities[:] = joint_vel
(
torques_commanded,
torques_applied,
torques_measured,
torques_external,
) = self.env.get_current_joint_torques()
robot_state.prev_joint_torques_computed[:] = torques_commanded
robot_state.prev_joint_torques_computed_safened[:] = torques_applied
robot_state.motor_torques_measured[:] = torques_measured
robot_state.motor_torques_external[:] = torques_external
robot_state.timestamp.GetCurrentTime()
# Query controller manager server for action
# TODO: have option for async mode through async calls, see
# https://grpc.io/docs/languages/python/basics/#simple-rpc-1
log_request_time = self.log_interval > 0 and t % self.log_interval == 0
msg = self.execute_rpc_call(
self.connection.ControlUpdate,
[robot_state],
log_request_time=log_request_time,
)
# Apply action to env
torque_command = np.array([t for t in msg.joint_torques])
self.env.apply_joint_torques(torque_command)
# Idle for the remainder of loop time
t += 1
spinner.spin()