mujoco_py/mjpid.pyx (175 lines of code) (raw):
from libc.math cimport fabs, fmax, fmin
from mujoco_py.generated import const
cdef int CONTROLLER_TYPE_PI_CASCADE = 1,
cdef struct PIDErrors:
double error
double integral_error
double derivative_error
cdef struct PIDOutput:
PIDErrors errors
double output
cdef struct PIDParameters:
double dt_seconds # PID sampling time.
double setpoint
double feedback
double Kp
double error_deadband
double integral_max_clamp
double integral_time_const
double derivative_gain_smoothing
double derivative_time_const
PIDErrors previous_errors
cdef mjtNum c_zero_gains(const mjModel*m, const mjData*d, int id) with gil:
return 0.0
cdef PIDOutput _pid(PIDParameters parameters):
"""
A general purpose PID controller implemented in the standard form.
Kp == Kp
Ki == Kp/Ti
Kd == Kp*Td
In this situation, Kp is a knob to tune the agressiveness, wheras Ti and Td will
change the response time of the system in a predictable way. Lower Ti or Td means
that the system will respond to error more quickly/agressively.
error deadband: if set will shrink error within to 0.0
clamp on integral term: helps on saturation problem in I.
derivative smoothing term: reduces high frequency noise in D.
:param parameters: PID parameters
:return: A PID output struct containing the control output and the error state
"""
cdef double error = parameters.setpoint - parameters.feedback
# Clamp error that's within the error deadband
if fabs(error) < parameters.error_deadband:
error = 0.0
# Compute derivative error
cdef double derivative_error = (error - parameters.previous_errors.error) / parameters.dt_seconds
derivative_error = (1.0 - parameters.derivative_gain_smoothing) * parameters.previous_errors.derivative_error + \
parameters.derivative_gain_smoothing * derivative_error
cdef double derivative_error_term = derivative_error * parameters.derivative_time_const
# Update and clamp integral error
integral_error = parameters.previous_errors.integral_error
integral_error += error * parameters.dt_seconds
integral_error = fmax(-parameters.integral_max_clamp, fmin(parameters.integral_max_clamp, integral_error))
cdef double integral_error_term = 0.0
if parameters.integral_time_const != 0:
integral_error_term = integral_error / parameters.integral_time_const
f = parameters.Kp * (error + integral_error_term + derivative_error_term)
return PIDOutput(output=f, errors=PIDErrors(error=error, derivative_error=derivative_error, integral_error=integral_error))
cdef enum USER_DEFINED_ACTUATOR_PARAMS:
IDX_PROPORTIONAL_GAIN = 0,
IDX_INTEGRAL_TIME_CONSTANT = 1,
IDX_INTEGRAL_MAX_CLAMP = 2,
IDX_DERIVATIVE_TIME_CONSTANT = 3,
IDX_DERIVATIVE_GAIN_SMOOTHING = 4,
IDX_ERROR_DEADBAND = 5,
cdef enum USER_DEFINED_CONTROLLER_DATA_PID:
IDX_INTEGRAL_ERROR = 0,
IDX_ERROR = 1,
IDX_DERIVATIVE_ERROR = 2,
NUM_USER_DATA_PER_ACT_PID = 3,
cdef mjtNum c_pid_bias(const mjModel*m, const mjData*d, int id):
"""
To activate PID, set gainprm="Kp Ti Td iClamp errBand iSmooth" in a general type actuator in mujoco xml
"""
cdef double dt_in_sec = m.opt.timestep
cdef int NGAIN = int(const.NGAIN)
result = _pid(parameters=PIDParameters(
dt_seconds=dt_in_sec,
setpoint=d.ctrl[id],
feedback=d.actuator_length[id],
Kp=m.actuator_gainprm[id * NGAIN + IDX_PROPORTIONAL_GAIN],
error_deadband=m.actuator_gainprm[id * NGAIN + IDX_ERROR_DEADBAND],
integral_max_clamp=m.actuator_gainprm[id * NGAIN + IDX_INTEGRAL_MAX_CLAMP],
integral_time_const=m.actuator_gainprm[id * NGAIN + IDX_INTEGRAL_TIME_CONSTANT],
derivative_gain_smoothing=m.actuator_gainprm[id * NGAIN + IDX_DERIVATIVE_GAIN_SMOOTHING],
derivative_time_const=m.actuator_gainprm[id * NGAIN + IDX_DERIVATIVE_TIME_CONSTANT],
previous_errors=PIDErrors(
error=d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_ERROR],
derivative_error=d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_DERIVATIVE_ERROR],
integral_error=d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_INTEGRAL_ERROR],
)))
d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_ERROR] = result.errors.error
d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_DERIVATIVE_ERROR] = result.errors.derivative_error
d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_INTEGRAL_ERROR] = result.errors.integral_error
f = result.output
cdef double effort_limit_low = m.actuator_forcerange[id * 2]
cdef double effort_limit_high = m.actuator_forcerange[id * 2 + 1]
if effort_limit_low != 0.0 or effort_limit_high != 0.0:
f = fmax(effort_limit_low, fmin(effort_limit_high, f))
return f
cdef enum USER_DEFINED_ACTUATOR_PARAMS_CASCADE:
IDX_CAS_PROPORTIONAL_GAIN = 0,
IDX_CAS_INTEGRAL_TIME_CONSTANT = 1,
IDX_CAS_INTEGRAL_MAX_CLAMP = 2,
IDX_CAS_DERIVATIVE_TIME_CONSTANT = 3,
IDX_CAS_DERIVATIVE_GAIN_SMOOTHING = 4,
IDX_CAS_PROPORTIONAL_GAIN_V = 5,
IDX_CAS_INTEGRAL_TIME_CONSTANT_V = 6,
IDX_CAS_INTEGRAL_MAX_CLAMP_V = 7,
IDX_CAS_EMA_SMOOTH = 8,
IDX_CAS_MAX_VEL = 9,
cdef enum USER_DEFINED_CONTROLLER_DATA_CASCADE:
IDX_CAS_ERROR = 0,
IDX_CAS_INTEGRAL_ERROR = 1,
IDX_CAS_DERIVATIVE_ERROR = 2,
IDX_CAS_INTEGRAL_ERROR_V = 3,
IDX_CAS_STORED_EMA_SMOOTH = 4,
NUM_USER_DATA_PER_ACT_CAS = 5,
cdef NUM_USER_DATA_PER_ACT = max(int(NUM_USER_DATA_PER_ACT_PID), int(NUM_USER_DATA_PER_ACT_CAS))
cdef mjtNum c_pi_cascade_bias(const mjModel*m, const mjData*d, int id):
"""
A cascaded PID-PI controller implementation that can control position
and velocity setpoints for a given actuator.
The cascaded controller is implemented as a nested position-velocity controller. A PID loop is
wrapped around desired position and a PI loop is wrapped around desired velocity. An exponential
moving average filter is applied to the commanded position input (d.ctrl). Additionally the controller
is gravity compensated via the `qfrc_bias` term.
The PID-PI gains are set as part of gainprm in the following order:
`gainprm=" Kp_pos -> Position loop proportional gain
Ti_pos -> Position loop integral time constant
Ti_max_clamp_pos -> Position loop integral error clamp
Td_pos -> Position loop derivative time constant
Td_smooth_pos -> Position loop derivative smoothing (EMA)
Kp_vel -> Velocity loop proportional gain
Ti_vel -> Velocity loop integral time constant
max_clamp_vel -> Velocity loop integral error clamp
ema_smooth_factor -> Exponential moving average (EMA) on desired position
max_vel"` -> Clamped velocity limit (applied in positive and negative direction)
"""
cdef double dt_in_sec = m.opt.timestep
cdef int NGAIN = int(const.NGAIN)
cdef double smooth_pos_setpoint;
cdef double Kp_cas = m.actuator_gainprm[id * NGAIN + IDX_CAS_PROPORTIONAL_GAIN]
# Apply Exponential Moving Average smoothing to the position setpoint, applying warmstart
# on the first iteration.
if d.time > 0.0:
ctrl_ema = d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_CAS_STORED_EMA_SMOOTH]
smoothing_factor = m.actuator_gainprm[id * NGAIN + IDX_CAS_EMA_SMOOTH]
smooth_pos_setpoint = (smoothing_factor * ctrl_ema) + (1 - smoothing_factor) * d.ctrl[id]
else:
smooth_pos_setpoint = d.ctrl[id]
d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_CAS_STORED_EMA_SMOOTH] = smooth_pos_setpoint
if Kp_cas != 0:
# Run a position PID loop and use the result to set a desired velocity signal
pos_output = _pid(parameters=PIDParameters(
dt_seconds=dt_in_sec,
setpoint=smooth_pos_setpoint,
feedback=d.actuator_length[id],
Kp=Kp_cas,
error_deadband=0.0,
integral_max_clamp=m.actuator_gainprm[id * NGAIN + IDX_CAS_INTEGRAL_MAX_CLAMP],
integral_time_const=m.actuator_gainprm[id * NGAIN + IDX_CAS_INTEGRAL_TIME_CONSTANT],
derivative_gain_smoothing=m.actuator_gainprm[id * NGAIN + IDX_CAS_DERIVATIVE_GAIN_SMOOTHING],
derivative_time_const=m.actuator_gainprm[id * NGAIN + IDX_CAS_DERIVATIVE_TIME_CONSTANT],
previous_errors=PIDErrors(
error=d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_CAS_ERROR],
derivative_error=d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_CAS_DERIVATIVE_ERROR],
integral_error=d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_CAS_INTEGRAL_ERROR],
)))
# Save errors
d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_CAS_ERROR] = pos_output.errors.error
d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_CAS_DERIVATIVE_ERROR] = pos_output.errors.derivative_error
d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_CAS_INTEGRAL_ERROR] = pos_output.errors.integral_error
des_vel = pos_output.output
else:
# If P gain on position loop is zero, only use the velocity controller
des_vel = d.ctrl[id]
# Clamp max angular velocity
max_qvel = m.actuator_gainprm[id * NGAIN + IDX_CAS_MAX_VEL]
des_vel = fmax(-max_qvel, fmin(max_qvel, des_vel))
vel_output = _pid(parameters=PIDParameters(
dt_seconds=dt_in_sec,
setpoint=des_vel,
feedback=d.actuator_velocity[id],
Kp=m.actuator_gainprm[id * NGAIN + IDX_CAS_PROPORTIONAL_GAIN_V],
error_deadband=0.0,
integral_max_clamp=m.actuator_gainprm[id * NGAIN + IDX_CAS_INTEGRAL_MAX_CLAMP_V],
integral_time_const=m.actuator_gainprm[id * NGAIN + IDX_CAS_INTEGRAL_TIME_CONSTANT_V],
derivative_gain_smoothing=0.0,
derivative_time_const=0.0,
previous_errors=PIDErrors(
error=0.0,
derivative_error=0.0,
integral_error=d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_CAS_INTEGRAL_ERROR_V],
)))
# Save errors
d.userdata[id * NUM_USER_DATA_PER_ACT + IDX_CAS_INTEGRAL_ERROR_V] = vel_output.errors.integral_error
# Limit max torque at the output
cdef double effort_limit_low = m.actuator_forcerange[id * 2]
cdef double effort_limit_high = m.actuator_forcerange[id * 2 + 1]
f = vel_output.output
# Gravity compensation
f += d.qfrc_bias[id]
if effort_limit_low != 0.0 or effort_limit_high != 0.0:
f = fmax(effort_limit_low, fmin(effort_limit_high, f))
return f
cdef enum USER_DEFINED_ACTUATOR_DATA:
IDX_CONTROLLER_TYPE = 0
NUM_ACTUATOR_DATA = 1
cdef mjtNum c_custom_bias(const mjModel*m, const mjData*d, int id) with gil:
"""
Switches between PID and Cascaded PID-PI type custom bias computation based on the
defined actuator's actuator_user field.
user="1": Cascade PID-PI
default: PID
:param m: mjModel
:param d: mjData
:param id: actuator ID
:return: Custom actuator force
"""
controller_type = int(m.actuator_user[id * m.nuser_actuator + IDX_CONTROLLER_TYPE])
if controller_type == CONTROLLER_TYPE_PI_CASCADE:
return c_pi_cascade_bias(m, d, id)
return c_pid_bias(m, d, id)
def set_pid_control(m, d):
global mjcb_act_gain
global mjcb_act_bias
if m.nuserdata < m.nu * NUM_USER_DATA_PER_ACT:
raise Exception('nuserdata is not set large enough to store PID internal states.')
if m.nuser_actuator < m.nu * NUM_ACTUATOR_DATA:
raise Exception('nuser_actuator is not set large enough to store controller types')
for i in range(m.nuserdata):
d.userdata[i] = 0.0
mjcb_act_gain = c_zero_gains
mjcb_act_bias = c_custom_bias