def gen_control()

in src/envs/ode.py [0:0]


    def gen_control(self, return_system=False, skip_unstable=False):
        """
        Generate systems of functions, data for controlability
        """
        degree = self.rng.randint(self.min_degree, self.max_degree + 1)
        p = self.rng.randint(1, degree // 2 + 1)
        nb_ops = self.rng.randint(degree + p, 2 * (degree + p) + 3, size=(degree,))
        while True:
            system = []
            i = 0
            ngen = 0
            while i < degree:
                # generate expression
                expr = self.generate_tree(
                    nb_ops[i], degree + p
                )  # si tau>0 doit on garantir l'existence de t (x{degree + p})?
                ngen += 1
                # sympy zone
                try:
                    expr_sp = sp.S(expr, locals=self.local_dict)
                    # skip constant or invalid expressions
                    if len(expr_sp.free_symbols) == 0 or has_inf_nan(expr_sp):
                        continue
                    # evaluate gradient in point
                    valA, valB = self.compute_gradient_control(
                        expr_sp, self.eval_point, degree, p
                    )
                    if (
                        np.isnan(valA).any()
                        or np.isinf(valA).any()
                        or np.isnan(valB).any()
                        or np.isinf(valB).any()
                    ):
                        continue
                    if self.skip_zero_gradient and not valA.any():
                        continue
                except TimeoutError:
                    continue
                except (ValueError, TypeError):
                    continue
                except Exception as e:
                    logger.error(
                        "An unknown exception of type {0} occurred in line {1} "
                        'for expression "{2}". Arguments:{3!r}.'.format(
                            type(e).__name__,
                            sys.exc_info()[-1].tb_lineno,
                            expr_sp,
                            e.args,
                        )
                    )
                    continue

                system.append(expr)
                if i == 0:
                    A = valA
                    B = valB
                else:
                    A = np.vstack((A, valA))
                    B = np.vstack((B, valB))
                i += 1
            if self.skip_zero_gradient:
                skip = False
                for i in range(degree):
                    if not A[:, [i]].any():
                        skip = True
                        break
                for i in range(p):
                    if not B[:, [i]].any():
                        skip = True
                        break
                if skip:
                    continue
            try:
                C = ctrl.ctrb(A, B)
                d = degree - np.linalg.matrix_rank(C, 1.0e-6)
                if d != 0 and (skip_unstable or self.prob_positive > 0.0):
                    continue
                if self.predict_gramian and d == 0:
                    # C = ctrl.lyap(A, - B @ B.T)
                    # K = - B.T @ np.linalg.inv(C)
                    A = A / np.linalg.norm(A)
                    B = B / np.linalg.norm(A)
                    tau = 1
                    yint = []
                    # We want to integrate a matrix over [0,tau]
                    # and all the integrate functions I found are for scalars.
                    # So we do it term by term
                    for i in range(degree):  # divide in row
                        yint_line = []
                        for j in range(degree):  # divide in column

                            dt = np.linspace(
                                0, tau, num=40
                            )  # integration path [0,tau] and 40 points
                            yint0 = []
                            for k in range(len(dt)):
                                # vector i with the component to be integrated (i,j),
                                # evaluated at each point of the integration path
                                res = (
                                    (expm(A * (tau - dt[k])))
                                    @ (B @ B.T)
                                    @ (expm(A.T * (tau - dt[k])))
                                )[i]
                                yint0.append(
                                    res[j]
                                )  # vector of the component (i,j) along itegration path
                            resline = (cumtrapz(yint0, dt, initial=0))[
                                len(dt) - 1
                            ]  # integration with cumulative trapezz
                            yint_line.append(resline)  # reconstruct the line
                        yint.append(yint_line)  # reconstruct the matrix
                    if np.isnan(yint).any() or np.isinf(yint).any():
                        continue
                    Ctau = (
                        expm(-tau * A) @ np.array(yint) @ expm(-tau * A.T)
                    )  # From the gramian to the true C
                    if np.isnan(Ctau).any() or np.isinf(Ctau).any():
                        continue
                    K = -B.T @ (np.linalg.inv(Ctau + 1e-6 * np.eye(degree)))
                    if np.isnan(K).any() or np.isinf(K).any():
                        continue

                    with np.nditer(K, op_flags=["readwrite"]) as it:
                        for x in it:
                            x[...] = float(f"%.{self.jacobian_precision}e" % x)

                    if max(np.linalg.eigvals(A + B @ K).real) > 0:
                        # Check that A+B@K is stable, which is equivalent to
                        # check_gramian
                        # print("UNSTABLE")
                        continue

            except Exception:
                # logger.error("An unknown exception of type {0} occurred
                # in line {1} for expression \"{2}\". Arguments:{3!r}.".format(
                # type(e).__name__, sys.exc_info()[-1].tb_lineno, expr_sp, e.args))
                continue
            break
        # # debug
        # logger.info(str(cspeed))
        # logger.info(str(cspeed) + "\t" + " ||||| ".join(str(s) for s in system[:3]))
        # print(degree, str(ngen) + " : " + str((ngen - degree) / ngen * 100.0))

        # encode input
        x = self.write_int(degree)
        for s in system:
            x.append(self.func_separator)
            x.extend(self.encode_expr(s))

        # encode output: dimension of control subspace and optionally the Gramian matrix
        if self.qualitative:
            controlable = 1 if d == 0 else 0
            y = self.write_int(controlable)
        else:
            y = self.write_int(d)
            if self.predict_gramian and d == 0:
                K = np.array(K)
                y.append(self.mtrx_separator)
                for row in K:
                    for value in row:
                        y.extend(self.write_complex(value, self.jacobian_precision))
                        y.append(self.list_separator)
                    y.append(self.line_separator)

        if self.max_len > 0 and (len(x) >= self.max_len or len(y) >= self.max_len):
            return None

        if return_system:
            return x, y, system, p
        else:
            return x, y