def directional_analyze()

in aiops/MicroAgents/layers/system/system_analyzer.py [0:0]


    def directional_analyze(self, graph, modules, direction:Literal["forward", "backward", "bidirectional", "isolation", "full"]="forward", decisions=None, k:int=1):
        """
        遍历一次进行分析,可以在整个模块依赖图上进行一次前向、后向或者双向传播
        Args:
            graph (dict): A networkx graph of the system indicating the dependencies between modules. The graph must be acyclic.
            modules (dict): A dict of modules. Each module is a dict with keys: "module_name", "module_function", "symptom"。
            k (int): consider the k-order neighbors.
        """
        if decisions is None:
            decisions = {}
            for node in modules:
                if modules[node]["symptom"]:
                    decisions.update({node: modules[node]["symptom"]})

        if direction == "bidirectional": #如果是双向传播,即图卷积分析,在整层分析完后再进行decisions的更新
            cur_decisions = copy.deepcopy(decisions)
        else:
            cur_decisions = decisions

        if direction == "backward":
            sorted_nodes = list(nx.topological_sort(graph))[::-1]
        else:
            sorted_nodes = list(nx.topological_sort(graph))
        sorted_nodes = self.filter_nodes(graph, sorted_nodes, modules)

        logger = Logger()
        for node in sorted_nodes:
            logger['cur_node'] = node
            logger['symptom'] = modules[node]["symptom"]
            logger['relations'] = modules[node]["module_dependency"]
            logger['direction'] = direction
            logger['neighbors'] = []

            agent = self.agents_name[node]

            if direction == "full":
                k = 1
                neighbor_nodes = {i: list(graph.nodes()) for i in range(1, k+1)}
            else:
                neighbor_nodes = get_k_order_neighbors_with_direction(graph, node, k, direction)

            message_from_other_agent = ""
            count = 1
            if direction == "isolation":
                logger['neighbors'] = []
            else:
                for i in range(1, k+1):
                    i_order_neighbors = neighbor_nodes[i]
                    if i_order_neighbors:
                        for neighbor in list(i_order_neighbors):
                            if neighbor in cur_decisions and cur_decisions[neighbor]:
                                # message_from_other_agent += f"""{count}. Diagnosis of the module {modules[neighbor]["module_name"]}. \n Module function: {modules[neighbor]["module_function"]}. \n Diagnosis result: {cur_decisions[neighbor]} \n"""
                                count += 1

                                logger['neighbors'].append({"module_name": modules[neighbor]["module_name"],"module_function": modules[neighbor]["module_function"], "diagnosis_result": cur_decisions[neighbor]})

            if modules[node]["symptom"] or logger['neighbors']:
                print('-'*25, f'{node}', '-'*25)
                logger.print()
                response = agent.reply(logger.to_message_text(direction))
                print(f'Diagnosis result of the module {node} in the current layer with the {direction} direction:\n', response)
            else:
                response = ""
            decisions.update({node: response})
        

        return decisions