in src/envs/ode.py [0:0]
def gen_control(self, return_system=False, skip_unstable=False):
"""
Generate systems of functions, data for controlability
"""
degree = self.rng.randint(self.min_degree, self.max_degree + 1)
p = self.rng.randint(1, degree // 2 + 1)
nb_ops = self.rng.randint(degree + p, 2 * (degree + p) + 3, size=(degree,))
while True:
system = []
i = 0
ngen = 0
while i < degree:
# generate expression
expr = self.generate_tree(
nb_ops[i], degree + p
) # si tau>0 doit on garantir l'existence de t (x{degree + p})?
ngen += 1
# sympy zone
try:
expr_sp = sp.S(expr, locals=self.local_dict)
# skip constant or invalid expressions
if len(expr_sp.free_symbols) == 0 or has_inf_nan(expr_sp):
continue
# evaluate gradient in point
valA, valB = self.compute_gradient_control(
expr_sp, self.eval_point, degree, p
)
if (
np.isnan(valA).any()
or np.isinf(valA).any()
or np.isnan(valB).any()
or np.isinf(valB).any()
):
continue
if self.skip_zero_gradient and not valA.any():
continue
except TimeoutError:
continue
except (ValueError, TypeError):
continue
except Exception as e:
logger.error(
"An unknown exception of type {0} occurred in line {1} "
'for expression "{2}". Arguments:{3!r}.'.format(
type(e).__name__,
sys.exc_info()[-1].tb_lineno,
expr_sp,
e.args,
)
)
continue
system.append(expr)
if i == 0:
A = valA
B = valB
else:
A = np.vstack((A, valA))
B = np.vstack((B, valB))
i += 1
if self.skip_zero_gradient:
skip = False
for i in range(degree):
if not A[:, [i]].any():
skip = True
break
for i in range(p):
if not B[:, [i]].any():
skip = True
break
if skip:
continue
try:
C = ctrl.ctrb(A, B)
d = degree - np.linalg.matrix_rank(C, 1.0e-6)
if d != 0 and (skip_unstable or self.prob_positive > 0.0):
continue
if self.predict_gramian and d == 0:
# C = ctrl.lyap(A, - B @ B.T)
# K = - B.T @ np.linalg.inv(C)
A = A / np.linalg.norm(A)
B = B / np.linalg.norm(A)
tau = 1
yint = []
# We want to integrate a matrix over [0,tau]
# and all the integrate functions I found are for scalars.
# So we do it term by term
for i in range(degree): # divide in row
yint_line = []
for j in range(degree): # divide in column
dt = np.linspace(
0, tau, num=40
) # integration path [0,tau] and 40 points
yint0 = []
for k in range(len(dt)):
# vector i with the component to be integrated (i,j),
# evaluated at each point of the integration path
res = (
(expm(A * (tau - dt[k])))
@ (B @ B.T)
@ (expm(A.T * (tau - dt[k])))
)[i]
yint0.append(
res[j]
) # vector of the component (i,j) along itegration path
resline = (cumtrapz(yint0, dt, initial=0))[
len(dt) - 1
] # integration with cumulative trapezz
yint_line.append(resline) # reconstruct the line
yint.append(yint_line) # reconstruct the matrix
if np.isnan(yint).any() or np.isinf(yint).any():
continue
Ctau = (
expm(-tau * A) @ np.array(yint) @ expm(-tau * A.T)
) # From the gramian to the true C
if np.isnan(Ctau).any() or np.isinf(Ctau).any():
continue
K = -B.T @ (np.linalg.inv(Ctau + 1e-6 * np.eye(degree)))
if np.isnan(K).any() or np.isinf(K).any():
continue
with np.nditer(K, op_flags=["readwrite"]) as it:
for x in it:
x[...] = float(f"%.{self.jacobian_precision}e" % x)
if max(np.linalg.eigvals(A + B @ K).real) > 0:
# Check that A+B@K is stable, which is equivalent to
# check_gramian
# print("UNSTABLE")
continue
except Exception:
# logger.error("An unknown exception of type {0} occurred
# in line {1} for expression \"{2}\". Arguments:{3!r}.".format(
# type(e).__name__, sys.exc_info()[-1].tb_lineno, expr_sp, e.args))
continue
break
# # debug
# logger.info(str(cspeed))
# logger.info(str(cspeed) + "\t" + " ||||| ".join(str(s) for s in system[:3]))
# print(degree, str(ngen) + " : " + str((ngen - degree) / ngen * 100.0))
# encode input
x = self.write_int(degree)
for s in system:
x.append(self.func_separator)
x.extend(self.encode_expr(s))
# encode output: dimension of control subspace and optionally the Gramian matrix
if self.qualitative:
controlable = 1 if d == 0 else 0
y = self.write_int(controlable)
else:
y = self.write_int(d)
if self.predict_gramian and d == 0:
K = np.array(K)
y.append(self.mtrx_separator)
for row in K:
for value in row:
y.extend(self.write_complex(value, self.jacobian_precision))
y.append(self.list_separator)
y.append(self.line_separator)
if self.max_len > 0 and (len(x) >= self.max_len or len(y) >= self.max_len):
return None
if return_system:
return x, y, system, p
else:
return x, y