common_components/monitoring/json_to_dot.py (154 lines of code) (raw):
#!/usr/bin/python3
# Copyright 2022 The Reg Reporting Blueprint Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Python script for converting a list of performance nodes into a DOT graph.
Expected list of nodes is in JSON of the format:
- List of objects
- Each object having the following:
- name - name of the node
- depends_on - list of upstream dependencies (correponding to other
nodes)
- start_time - Javascript formatted timestamp when node started
- end_time - Javascript formatted timestamp when node ended
"""
import os.path
import datetime
import sys
import argparse
import json
def clean_name(name):
""" Turn a name into a valid dot node name """
return name.replace('.','__').replace("-", "__")
def trim_names(nodes):
""" Remove common prefixes from all graph names (DBT can be verbose!) """
names = [n['name'] for n in nodes.values()]
prefix = os.path.commonprefix(names)
if prefix:
for node in nodes.values():
node['name'] = node['name'][len(prefix):]
def find_critical_path(nodes):
""" Identify the critical path and mark it red
The critical path is the set of nodes whose time,
if shorter, would result in the entire graph being
having shorter term.
"""
def find_last_node(ids):
end_time = None
node_id = None
for node in ids:
if end_time is None or nodes[node]["end_time"] > end_time:
end_time = nodes[node]["end_time"]
node_id = node
return node_id
# Start with all nodes
upstream_nodes = nodes.keys()
while True:
# Find the last node to finish if possible
last_node = find_last_node(upstream_nodes)
if last_node is None:
break
# Mark as critical
nodes[last_node]['critical'] = True
# New upstream nodes
upstream_nodes = nodes[last_node]['depends_on']
def find_real(nodes):
""" Remove nodes that have no timing information
DBT can have nodes that have no timing information
and not actually run in the data warehouse (e.g.,
ephemeral queries). These are removed from the graph,
while preserving the overall dependency graph.
"""
# Assumes no cycles in the graph!
def expand_real(depends):
new_depends = set()
for dep in depends:
# Source. We should have these in the graph!
if not dep in nodes:
continue
if nodes[dep]["start_time"]:
new_depends.add(dep)
elif "real_depends_on" in nodes[dep]:
new_depends.update(nodes[dep]["real_depends_on"])
else:
new_depends.update(expand_real(nodes[dep]["depends_on"]))
return new_depends
# Find the real depends_on
for node in nodes.values():
node["real_depends_on"] = expand_real(node["depends_on"])
# Shift to actual depends_on
for node in list(nodes.keys()):
if not nodes[node]['start_time']:
del nodes[node]
continue
nodes[node]['depends_on'] = nodes[node]['real_depends_on']
del nodes[node]['real_depends_on']
def get_elapsed_time(secs):
""" Turn absolute seconds into a readable time format """
secs = int(secs)
hours = secs // 3600
secs -= (hours * 3600)
mins = secs // 60
secs -= (mins * 60)
if hours > 0:
return f'{hours:2d}:{mins:02d}:{secs:02d}'
if mins > 0:
return f'{mins:2d}:{secs:02d}'
return f'{secs:2d}'
def json_to_dot_dict(data):
""" Convert incoming JSON node list into a set of
python formatted dictionary.
"""
nodes = {}
for node in data:
name = clean_name(node['name'])
nodes[name] = {
'name': node['name'],
'start_time': None,
'end_time': None,
'depends_on': [clean_name(x) for x in node['depends_on']],
}
if node['end_time']:
nodes[name]['end_time'] = (datetime.datetime
.fromisoformat(node['end_time'])
.timestamp())
if node['start_time']:
nodes[name]['start_time'] = (datetime.datetime
.fromisoformat(node['start_time'])
.timestamp())
return nodes
def render_dot(nodes,
left_right=False,
seconds_per_block=1,
fontsize=24,
debug=False):
"""Render graph into a DOT graph
Parameters
----------
nodes : Nodes
Node graph for rendering
left_right : bool
Whether to use left to right or top to bottom for query timelines
seconds_per_block : int
Number of seconds per block size
fontsize : int
Font size for nodes
debug : bool
Whether to produce a DOT graph that is debuggable
"""
start_time = min([v["start_time"] for v in nodes.values() if v["start_time"]])
end_time = max([v["end_time"] for v in nodes.values() if v["end_time"]])
def find_block(time_stamp):
return int(time_stamp - start_time) // seconds_per_block
print("digraph {\n")
print('newrank=true;')
print('compound=true;')
print(f'rankdir="{"LR" if left_right else "TB"}";')
print('ranksep="equally";')
print('overlap="false";')
print(f'node [fontsize={fontsize}, shape = plaintext];')
print('\n/* Timeline */\n')
print('edge [style="invis"];')
for blk in range(find_block(end_time)):
print(f"{blk} -> {blk+1}")
print(f"{blk} [label=\"{get_elapsed_time(blk * seconds_per_block)}\"] ")
print('\n/* Main Styling */\n')
print('edge [style="solid"];')
if debug:
print('node [shape="box"];')
print('\n/* Main Data */\n')
for node_id, node in nodes.items():
print(f"subgraph cluster_{node_id} {{")
print(f' bgcolor="{"red" if "critical" in node else "steelblue1"}"')
print(" style=\"rounded\"")
# Find start and end time block for alignment
start_align = find_block(node["start_time"])
end_align = find_block(node["end_time"]) - 1
if start_align < end_align:
print(f" {node_id}_start [fontsize={fontsize}, label=\"{node['name']}\" width=0]")
print(f" {node_id} [style=\"invis\" label=\"{node['name']}\"]")
else:
print(f" {node_id} [fontsize={fontsize}, label=\"{node['name']}\"]")
print("}")
if start_align < end_align:
print(f"{{ rank=same; {start_align} {node_id}_start }}")
print(f"{{ rank=same; {end_align} {node_id} }}")
else:
print(f"{{ rank=same; {start_align} {node_id} }}")
# Upstream dependencies
for depends in node["depends_on"]:
print(f"{depends} -> {node_id}{'_start' if start_align < end_align else ''} " +
f"[ltail=\"cluster_{depends}\" lhead=\"cluster_{node_id}\"]")
print("}\n")
def main():
""" Main for converting a dependency graph into a graphviz DOT formatted graph"""
parser = argparse.ArgumentParser(description='Convert JSON performance graph to DOT graph')
parser.add_argument('json_graph', nargs='?', type=argparse.FileType('r'),
default=sys.stdin)
parser.add_argument('dot_graph', nargs='?', type=argparse.FileType('w'),
default=sys.stdout)
parser.add_argument('--lr',
help='Left to right graph instead of top to bottom',
action='store_true')
parser.add_argument('--seconds_per_block',
type=int, default=1,
help='Seconds per incremental in graph output')
parser.add_argument('--debug',
help='Enable debug style DOT graph',
action='store_true')
args = parser.parse_args()
# Read in JSON
data = json.load(sys.stdin)
# Convert to a dictionary from a list
nodes = json_to_dot_dict(data)
# Find real nodes, trim names
find_real(nodes)
trim_names(nodes)
find_critical_path(nodes)
# Reader as a DOT graph
render_dot(nodes,
left_right=args.lr,
seconds_per_block=args.seconds_per_block,
debug=args.debug)
if __name__ == "__main__":
main()