visualize_rules_graph.py (156 lines of code) (raw):
import argparse
import os
import re
import toml
import graphviz
from dataclasses import dataclass
# REQUIRED: pip install toml
# REQUIRED: pip install graphviz
@dataclass(frozen=True, eq=True)
class Edge:
to: str
scope: str
def collect_rules_groups_edges():
for config_path in args.configurations_path:
config_path = os.path.abspath(config_path) + os.path.sep
rules_file = config_path + 'rules.toml'
# rules.toml should exist, deliberately fail otherwise
rules_toml_dict = toml.load(rules_file)
collect_rules_and_groups(rules_toml_dict)
# edges are optional for input rules (i.e., non built-in)
edges_file = config_path + 'edges.toml'
if os.path.exists(edges_file):
edges_toml_dict = toml.load(edges_file)
collect_edges(edges_toml_dict)
def collect_rules_and_groups(rules_toml_dict):
"""
Collects rules and groups to further build graph nodes.
Rules belonging to a group different than `Cleanup Rule` are displayed below the group name.
Cleanup Rule's are added to a set as they are displayed differently.
They get an additional label `(Cleanup_Rule)`.
Nodes without a group are added to a different set.
"""
for rule_toml in rules_toml_dict['rules']:
rule_name: str = sanitize_name(rule_toml['name'])
if 'query' not in rule_toml:
dummy_nodes.add(rule_name)
if 'groups' in rule_toml:
rule_groups = rule_toml['groups']
collect_node_for_rule_with_group(rule_name, rule_groups)
else:
nodes_without_groups.add(rule_name)
def collect_node_for_rule_with_group(rule_name: str, rule_groups: 'list[str]'):
for group_name in rule_groups:
group_name = sanitize_name(group_name)
if group_name != 'Cleanup_Rule':
if group_name in rules_by_group_dict:
rules_by_group_dict[group_name].append(rule_name)
else:
rules_by_group_dict[group_name] = [rule_name]
else: # we don't want to group `Cleanup Rule`s under the same graphviz shape
cleanup_rules.add(rule_name)
def sanitize_name(s: str) -> str:
"""Graphviz does not like names with spaces. Converts spaces to '_'."""
s = re.sub(r"\s+", '_', s)
return s
def collect_edges(edges_toml_dict):
"""
Groups outgoing edges by rule/group.
All names are sanitized to replace empty spaces by `_`.
"""
for edge_toml in edges_toml_dict['edges']:
from_node = sanitize_name(edge_toml['from'])
to_nodes: 'list[str]' = edge_toml['to']
scope = sanitize_name(edge_toml['scope'])
for to_node in to_nodes:
to_node = sanitize_name(to_node)
edge = Edge(to=to_node, scope=scope)
if from_node in outgoing_edges_by_node:
outgoing_edges_by_node[from_node].append(edge)
else:
outgoing_edges_by_node[from_node] = [edge]
def initialize_graph() -> graphviz.Digraph:
graph_attr = {
'label': str(args.title),
'labelloc': 't',
'fontsize': '30'
}
graph = graphviz.Digraph(filename=output_file_path, graph_attr=graph_attr)
graph.format = 'svg'
return graph
def generate_graph_nodes():
generate_nodes_with_groups_and_outgoing_edges()
# rules *without* outgoing edges and no groups have not been added yet
# this is because we focus on outgoing edges when traversing `edges.toml`
generate_nodes_without_groups_and_no_outgoing_edges()
def generate_nodes_with_groups_and_outgoing_edges():
for rule_name in all_rule_names_with_groups_and_outgoing_edges():
if rule_name in rules_by_group_dict:
# several (n >= 1) rules under the same graphviz `record` shape.
generate_node_for_group(rule_name)
else:
# not a graphviz `record` node. single rule in the shape.
generate_node_for_rule_not_under_a_group(rule_name)
added_nodes.add(rule_name)
def all_rule_names_with_groups_and_outgoing_edges() -> 'list[str]':
# set difference
# map.keys() is a set view, it doesn't have set's methods but supports operators
node_names_with_only_outgoing_edges = rules_by_group_dict.keys() - \
outgoing_edges_by_node.keys()
rule_names_with_groups = list(outgoing_edges_by_node.keys())
rule_names_with_groups.extend(node_names_with_only_outgoing_edges)
rule_names_with_groups.extend(cleanup_rules)
return rule_names_with_groups
def generate_node_for_group(rule_name: str):
rule_names_in_group = rules_by_group_dict[rule_name]
for group_rule_name in rule_names_in_group:
added_nodes.add(group_rule_name)
rule_names_label: 'list[str]' = [
append_cleanup_rule_if_needed(rule_name) for rule_name in rule_names_in_group
]
#############################
# boolean_expression_simplify
#
# simplify_not_false
# simplify_not_true
# ...
#############################
node_label = rule_name + '\\n\\n' + '\\n'.join(rule_names_label)
graph.node(rule_name, node_label, shape='record')
def append_cleanup_rule_if_needed(rule_name: str) -> str:
"""
Should be called for rules under a group.
If a rule is a cleanup rule, we append the label to the node's name *on the same line*.
Cleanup Rules are treated differently because:
1) Currently, there are no edges to `Cleanup Rule` (as opposed to other groups)
2) nodes may have another group with incoming/outgoing edges.
We want to display rules under a group which indicates flow (i.e., has edges).
At the same time, we still want to indicate in the graph that a rule is a cleanup rule.
"""
if rule_name in cleanup_rules:
return f'{rule_name} (Cleanup Rule)'
else:
return rule_name
def generate_node_for_rule_not_under_a_group(rule_name: str):
"""The rule will be a standalone node; we can add (Cleanup Rule) *on a new line* if needed."""
if rule_name not in added_nodes:
if rule_name in dummy_nodes:
graph.node(rule_name, shape='doubleoctagon')
elif rule_name in cleanup_rules:
node_label = f'{rule_name}\\n(Cleanup Rule)'
graph.node(rule_name, node_label)
else:
graph.node(rule_name)
def generate_nodes_without_groups_and_no_outgoing_edges():
# nodes that don't have any group, not even 'Cleanup_Rule'
# and also no outgoing edge
# i.e., leaf nodes with no groups -> standalone shape
for node in nodes_without_groups:
# avoid adding a node already added through an edge
if node not in added_nodes:
added_nodes.add(node)
graph.node(node)
def generate_graph_edges():
for node, edges in outgoing_edges_by_node.items():
for edge in edges:
graph.edge(node, edge.to, edge.scope)
###########
# Arguments
example_text = '''For a complete output, the script needs the directory with the built-in rules for a given language. Example:
python visualize_rules_graph.py ./ff_cleanup.dot src/cleanup_rules/java demo/feature_flag_cleanup/java/configurations --title "Feature Flag Cleanup"
Experimental:
The generated graph may end very wide if you have several rules with no outgoing edges.
You can experiment passing the `--unflatten` option and changing the values of `--stagger` (https://graphviz.readthedocs.io/en/stable/manual.html#unflatten).
Another option is to manually edit the generated .dot file to include invisible edges (https://stackoverflow.com/a/11136488/1008952).
'''
description_text = '''Script to output a .dot graph and svg image for visualizing how rules/groups are connected.
Please install the `toml` PyPi package: `python install toml`.
Please install the `graphviz` PyPi package: `python install graphviz`.
The script assumes that rules will have at most [0,2] groups. If a rule has two groups, one will be `Cleanup Rule`. The visualization will likely break if any rule has > 2 groups.'''
arg_parser = argparse.ArgumentParser(
description=description_text, epilog=example_text, formatter_class=argparse.RawDescriptionHelpFormatter)
arg_parser.add_argument('output_file_path', type=str,
help="Path and file name/extension for the output file.")
arg_parser.add_argument('configurations_path', type=str, nargs='+',
help="One or more root directory that contains 'rules.toml' and 'edges.toml'")
arg_parser.add_argument('--title', nargs='?', default='',
help='Optional title for the graph')
arg_parser.add_argument('--unflatten', action='store_true', default=False)
arg_parser.add_argument('--stagger', nargs='?', default=2, const=2, type=int)
args = arg_parser.parse_args()
###########
# Execution
rules_by_group_dict: 'dict[str, list[str]]' = {}
outgoing_edges_by_node: 'dict[str, list[Edge]]' = {}
cleanup_rules: 'set[str]' = set()
nodes_without_groups: 'set[str]' = set()
# nodes without `query`
dummy_nodes: 'set[str]' = set()
output_file_path = os.path.abspath(args.output_file_path)
if os.path.isdir(output_file_path):
raise ValueError(
'output_file_path (first arg) should be a file, not a directory')
output_dir_path = os.path.dirname(output_file_path)
if not os.path.exists(output_dir_path):
os.makedirs(output_dir_path)
collect_rules_groups_edges()
graph = initialize_graph()
added_nodes: 'set[str]' = set()
generate_graph_nodes()
generate_graph_edges()
if args.unflatten:
graph.unflatten(stagger=args.stagger).render()
else:
graph.render()