aiops/MicroAgents/layers/system/system_analyzer.py (182 lines of code) (raw):
import os
from typing import Callable, Dict, Literal, Optional, Union
from functools import partial
from agents import AgentBase, ExpertAgent, ModuleAgent
import networkx as nx
from ..utils import get_k_order_neighbors_with_direction
import copy
from logger.logger import Logger
import asyncio
class SystemAnalyzer:
def __init__(self, modules, model, system_instructions, tools=None, human_input_mode=False, intervention_mode: Literal["COVER", "APPEND"] = "COVER"):
"""
Args:
modules (list[dict]): A list of modules. Each module is a dict with keys: "module_name", "module_function", "module dependency"
model_config_name (str): The configuration file name of the model.
system_instructions (str): The system prompt for the module agent.
tools (dict): The tools for the module agent.
human_input_mode (bool): Whether to enable human input mode. If True, the system will ask the human expert for the diagnosis result.
intervention_mode (Literal["COVER", "APPEND"]): The intervention mode. COVER means the system will cover the diagnosis result with the human expert's analysis; APPEND means the system will append the human expert's analysis to the diagnosis result.
"""
self.modules = modules
self.agents = []
self.agents_name = {}
self.human_input_mode = human_input_mode
self.intervention_mode = intervention_mode
self.huamn_expert = ExpertAgent(name="human expert", human_input_mode=human_input_mode)
for module in self.modules.values():
module_analyzer = ModuleAgent(name=f"{module['module_name']} agent", tools=tools,module=module, model=model, system_instructions=system_instructions)
self.agents.append(module_analyzer)
self.agents_name[module["module_name"]] = self.agents[-1]
def analyze(self, graph, modules, organization_mode:Literal["forward", "backward", "forward_and_backward", "convolution", "isolation", "full"]="backward", k:int=1, layers=1):
'''
根据系统的依赖关系,分析整个系统的异常机器根因,可以从模块依赖图前向分析、后向分析、先前向再后向分析或者类似图神经网络进行图扩散分析(同时考虑前继节点和后继节点)
Args:
graph (dict): A networkx graph of the system indicating the dependencies between modules. The graph must be acyclic.
modules (dict): A dict of modules. Each module is a dict with keys: "module_name", "module_function", "symptom", "module_dependency"
'''
if organization_mode in ["forward", "backward", "isolation", "full"]:
decisions = self.directional_analyze(graph, modules, direction=organization_mode, decisions=None, k=k)
elif organization_mode =="forward_and_backward":
decisions = self.forward_and_backward_analyze(graph, modules, k=k, layers=layers)
elif organization_mode =="convolution":
decisions = self.convolution_analyze(graph, modules, k=k, layers=layers)
else:
raise NotImplementedError(f"organization mode {organization_mode} is not implemented !!!")
decisions = None
if self.human_input_mode:
for node in decisions.keys():
if decisions[node]:
human_response = self.huamn_expert.reply(decisions[node])
if self.intervention_mode.lower() == 'cover' and human_response:
decisions[node] = human_response
elif self.intervention_mode.lower() == 'append' and human_response:
decisions[node] += f"\nHuman expert's analysis: {human_response}"
return decisions
def filter_nodes(self, graph, node_list, node_anomaly_map):
'''
过滤掉邻居节点和自身均正常的节点
'''
node_list_filtered = copy.deepcopy(node_list)
for node in node_list:
if not node_anomaly_map[node]["symptom"]:
node_list_filtered.remove(node)
return node_list_filtered
async def a_directional_analyze(self, graph, modules, direction:Literal["forward", "backward", "bidirectional", "isolation", "full"]="forward", decisions=None, k:int=1):
"""
遍历一次进行分析,可以在整个模块依赖图上进行一次前向、后向或者双向传播
Args:
graph (dict): A networkx graph of the system indicating the dependencies between modules. The graph must be acyclic.
modules (dict): A dict of modules. Each module is a dict with keys: "module_name", "module_function", "symptom"。
k (int): consider the k-order neighbors.
"""
if decisions is None:
decisions = {}
for node in modules:
if modules[node]["symptom"]:
decisions.update({node: modules[node]["symptom"]})
if direction == "bidirectional": #如果是双向传播,即图卷积分析,在整层分析完后再进行decisions的更新
cur_decisions = copy.deepcopy(decisions)
else:
cur_decisions = decisions
if direction == "backward":
sorted_nodes = list(nx.topological_sort(graph))[::-1]
else:
sorted_nodes = list(nx.topological_sort(graph))
sorted_nodes = self.filter_nodes(graph, sorted_nodes, modules)
loggers = []
for node in sorted_nodes:
logger = Logger()
logger['cur_node'] = node
logger['symptom'] = modules[node]["symptom"]
logger['relations'] = modules[node]["module_dependency"]
logger['direction'] = direction
logger['neighbors'] = []
agent = self.agents_name[node]
if direction == "full":
k = 1
neighbor_nodes = {i: list(graph.nodes()) for i in range(1, k+1)}
else:
neighbor_nodes = get_k_order_neighbors_with_direction(graph, node, k, direction)
message_from_other_agent = ""
count = 1
if direction == "isolation":
logger['neighbors'] = []
else:
for i in range(1, k+1):
i_order_neighbors = neighbor_nodes[i]
if i_order_neighbors:
for neighbor in list(i_order_neighbors):
if neighbor in cur_decisions and cur_decisions[neighbor]:
# message_from_other_agent += f"""{count}. Diagnosis of the module {modules[neighbor]["module_name"]}. \n Module function: {modules[neighbor]["module_function"]}. \n Diagnosis result: {cur_decisions[neighbor]} \n"""
count += 1
logger['neighbors'].append({"module_name": modules[neighbor]["module_name"],"module_function": modules[neighbor]["module_function"], "diagnosis_result": cur_decisions[neighbor]})
if modules[node]["symptom"] or logger['neighbors']:
loggers.append(logger)
else:
response = ""
decisions.update({node: response})
start = 0
end = 0
while start < len(loggers):
start = end
end = start + 4
if end > len(loggers):
end = len(loggers)
responses = await asyncio.gather(*[self.agents_name[logger['cur_node']].areply(logger.to_message_text(direction)) for logger in loggers[start:end]])
for response, logger in zip(responses, loggers[start:end]):
print('-'*25, f'{logger["cur_node"]}', '-'*25)
logger.print()
print(f'Diagnosis result of the module {logger["cur_node"]} in the current layer with the {direction} direction:\n', response)
decisions.update({logger['cur_node']: response})
return decisions
def directional_analyze(self, graph, modules, direction:Literal["forward", "backward", "bidirectional", "isolation", "full"]="forward", decisions=None, k:int=1):
"""
遍历一次进行分析,可以在整个模块依赖图上进行一次前向、后向或者双向传播
Args:
graph (dict): A networkx graph of the system indicating the dependencies between modules. The graph must be acyclic.
modules (dict): A dict of modules. Each module is a dict with keys: "module_name", "module_function", "symptom"。
k (int): consider the k-order neighbors.
"""
if decisions is None:
decisions = {}
for node in modules:
if modules[node]["symptom"]:
decisions.update({node: modules[node]["symptom"]})
if direction == "bidirectional": #如果是双向传播,即图卷积分析,在整层分析完后再进行decisions的更新
cur_decisions = copy.deepcopy(decisions)
else:
cur_decisions = decisions
if direction == "backward":
sorted_nodes = list(nx.topological_sort(graph))[::-1]
else:
sorted_nodes = list(nx.topological_sort(graph))
sorted_nodes = self.filter_nodes(graph, sorted_nodes, modules)
logger = Logger()
for node in sorted_nodes:
logger['cur_node'] = node
logger['symptom'] = modules[node]["symptom"]
logger['relations'] = modules[node]["module_dependency"]
logger['direction'] = direction
logger['neighbors'] = []
agent = self.agents_name[node]
if direction == "full":
k = 1
neighbor_nodes = {i: list(graph.nodes()) for i in range(1, k+1)}
else:
neighbor_nodes = get_k_order_neighbors_with_direction(graph, node, k, direction)
message_from_other_agent = ""
count = 1
if direction == "isolation":
logger['neighbors'] = []
else:
for i in range(1, k+1):
i_order_neighbors = neighbor_nodes[i]
if i_order_neighbors:
for neighbor in list(i_order_neighbors):
if neighbor in cur_decisions and cur_decisions[neighbor]:
# message_from_other_agent += f"""{count}. Diagnosis of the module {modules[neighbor]["module_name"]}. \n Module function: {modules[neighbor]["module_function"]}. \n Diagnosis result: {cur_decisions[neighbor]} \n"""
count += 1
logger['neighbors'].append({"module_name": modules[neighbor]["module_name"],"module_function": modules[neighbor]["module_function"], "diagnosis_result": cur_decisions[neighbor]})
if modules[node]["symptom"] or logger['neighbors']:
print('-'*25, f'{node}', '-'*25)
logger.print()
response = agent.reply(logger.to_message_text(direction))
print(f'Diagnosis result of the module {node} in the current layer with the {direction} direction:\n', response)
else:
response = ""
decisions.update({node: response})
return decisions
def forward_and_backward_analyze(self, graph, modules, k=1, layers=1):
"""
遍历两次进行分析,先前向再后向传播
Args:
graph (dict): A networkx graph of the system indicating the dependencies between modules. The graph must be acyclic.
modules (dict): A dict of modules. Each module is a dict with keys: "module_name", "module_function", "symptom", "module_dependency"
"""
decisions = {}
for node in modules:
if modules[node]["symptom"]:
decisions.update({node: modules[node]["symptom"]})
for _ in range(layers):
decisions = self.directional_analyze(graph, modules, direction="forward", decisions=decisions, k=k)
decisions = self.directional_analyze(graph, modules, direction="backward", decisions=decisions, k=k)
return decisions
def convolution_analyze(self, graph, modules, k=1, layers=1):
"""
遍历多次进行分析,类似图神经网络进行图扩散分析(同时考虑前继节点和后继节点)
Args:
graph (dict): A networkx graph of the system indicating the dependencies between modules. The graph must be acyclic.
modules (dict): A dict of modules. Each module is a dict with keys: "module_name", "module_function", "symptom", "module_dependency"
"""
decisions = {}
for node in modules:
if modules[node]["symptom"]:
decisions.update({node: modules[node]["symptom"]})
for __ in range(layers):
print('='*25, f'layer {__}', '='*25)
# decisions = asyncio.run(self.a_directional_analyze(graph, modules, direction="bidirectional", decisions=decisions, k=k))
decisions = self.directional_analyze(graph, modules, direction="bidirectional", decisions=decisions, k=k)
return decisions