uberpoet/dotreader.py (143 lines of code) (raw):
# Copyright (c) 2018 Uber Technologies, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import itertools
import logging
import os
import tempfile
from collections import defaultdict
from pprint import pprint
from typing import Dict, List, Set # noqa: F401
from .moduletree import ModuleNode
from .util import makedir
class DotFileReader(object):
"""
This class reads a dot file from a `buck query "deps(target)" --dot > file.gv` output
and translates it into a `ModuleNode` dependency graph for the `BuckProjectGenerator` class
to consume. The entry point is `DotFileReader().read_dot_file(path)`
Why make our own dot file parser vs using the two other python libaries available? They're
really slow. Probably because of some extra stuff we don't do.
"""
def __init__(self, modules_filter=None):
# type: (List[str]) -> None
"""
Initalizes object.
:param modules_filter: If a graph edge contains a string inside the list as a substring, it is ignored.
Keep it empty to not filter out anything. If not specified, a default list of test names are used.
"""
if modules_filter is None:
# Why filter out modules with these names?
# We are trying to simulate a non-test app build and ignore non-code targets such as asset
# catalogs and schemes. Each module currently gets a static amount of code created for it,
# so non-code modules will add code to the total when they shouldn't if we didn't filter them out.
modules_filter = ['test', 'scheme', 'assetcatalog', 'resources', 'fixture', 'needle']
self.modules_filter = modules_filter
def read_dot_file(self, path, root_node_name, is_debug=False):
# type: (str, str, bool) -> (ModuleNode, List[ModuleNode])
"""
Reads a BUCK dependency dump in a dot/gv file at `path` and returns a `ModuleNode`
graph root and list of nodes to generate a mock app from it.
:param path: Path to the dot file
:param root_node_name: The name of the root application node in the dependency graph
:param is_debug: Enable this to dump some intermediate objects to help with debugging
:return: The a tuple of the root node of the tree and a list of all nodes in the tree
"""
with open(path, 'r') as f:
text = f.read()
raw_edges = self.extract_edges(text)
ident_names = self.identical_names(raw_edges)
edges = self.clean_edge_names(raw_edges)
dep_map = self.make_dep_map_from_edges(edges) # A dep_map is really an outgoing edge map
# Debug dumps of dot reader state for debugging
if is_debug:
incoming_map = self.incoming_edge_map_from_dep_map(dep_map)
anon_edge = self.anonymize_edge_names(edges, root_node_name)
self.debug_dump([edges, raw_edges, anon_edge], [dep_map, ident_names, incoming_map])
if ident_names:
logging.error("Found identical buck target names in dot file: %s", path)
logging.error(str(ident_names))
raise ValueError("Dot file contains buck target names that are identical, but have different paths")
else:
root, nodes = self.mod_graph_from_dep_map(dep_map, root_node_name)
logging.debug("%s %s total nodes: %d", root, root.deps, len(nodes))
return root, nodes
def extract_edges(self, text):
# type: (str)-> List[List[str]]
"""
Converts dot file text with buck targets as edges into a simpler [(string,string)] list.
Also filters out unwanted target types based on the names in self.modules_filter.
Dot files are basically lines of `"string" -> "string";` that represent a list of edges in a
graph.
"""
def edge(f_line):
return f_line[:-1].split('->')
def name(f_part):
return str(f_part.strip().replace('"', ''))
def fil(f_line):
lower = line.lower()
bad_word = False
for k in self.modules_filter:
if k in lower:
bad_word = True
break
return '->' in f_line and not bad_word
return [[name(part) for part in edge(line)] for line in text.splitlines() if fil(line)]
@staticmethod
def extract_buck_target(text):
# type: (str) -> str
"""
Extracts the target name from a buck target path. Ex: //a/b/c:target -> target
If the target is invalid, just returns the original text.
"""
parts = text.split(":")
if len(parts) != 2:
return text
return parts[1]
def clean_edge_names(self, edges):
# type: (List[List[str]]) -> List[List[str]]
"""Makes edge names only be their buck target name"""
return [[self.extract_buck_target(text) for text in pair] for pair in edges]
@staticmethod
def anonymize_edge_names(edges, main_app_module_name):
# type: (List[List[str]], str) -> List[List[str]]
"""
Makes edge names anonymous so you can send them to third parties without
revealing the names of your modules.
"""
# make lib_index an object to avoid a scope quirk of python with inner functions
# https://www.codesdope.com/blog/article/nested-function-scope-of-variable-closures-in-pyth/
lib_index = itertools.count()
name_dict = {main_app_module_name: "DotReaderMainModule"}
def name(orig):
if orig not in name_dict:
name_dict[orig] = 'DotReaderLib' + str(next(lib_index))
return name_dict[orig]
return [[name(left), name(right)] for left, right in edges]
@staticmethod
def make_dep_map_from_edges(edges):
# type: (List[List[str]]) -> Dict[str, List[str]]
"""Converts a raw [(origin,destination)] edge list into a {origin:[destinations]} outgoing edge map."""
dep_map = defaultdict(list)
for pair in edges:
origin, destination = pair
dep_map[origin] += [destination]
return dict(dep_map)
@staticmethod
def incoming_edge_map_from_dep_map(dep_map):
# type: (Dict[str, List[str]]) -> Dict[str, List[str]]
"""Converts a outgoing edge map into it's inverse, a {destination:[origins]} incoming edge map."""
incoming = defaultdict(list)
for node, outgoing in dep_map.iteritems():
for out in outgoing:
incoming[out] += [node]
# Roots wont show up in the incoming list in the above for loop
roots = set(dep_map.keys()) - set(incoming.keys())
for root in roots:
incoming[root] = []
return dict(incoming)
@staticmethod
def reachability_set(dep_map, root_node_name):
# type: (Dict[str, List[str]], str) -> Set[str]
"""
WARNING: Doesn't work currently. It's a TODO to fix this.
Returns a set of all nodes reachable from root_node_name
"""
seen = set()
consume_list = [root_node_name]
while len(consume_list) > 0:
item = consume_list.pop(0)
seen.add(item)
consume_list += dep_map[item]
return seen
def find_roots_in_dep_map(self, dep_map):
# type: (Dict[str, List[str]]) -> List[str]
"""
Finds the roots in the DAG represented by a outgoing edge map.
If it returns empty, then you have cycles and thus don't have a DAG.
"""
incoming = self.incoming_edge_map_from_dep_map(dep_map)
# A node with no incoming edges and some outgoing edges is a root in a DAG
# Nodes with no edges are not really part of a graph, so we ignore them
return [node for node, incoming_edges in incoming.iteritems() if not incoming_edges and dep_map[node]]
def identical_names(self, edges):
# type: (List[List[str]]) -> Dict[str,int]
"""Returns how many times a buck target name occurs in a edge list, filtering out unique (count == 1) names"""
dep_map = self.make_dep_map_from_edges(edges)
name_count = {}
for k in dep_map.keys():
name = self.extract_buck_target(k)
name_count[name] = name_count.get(name, 0) + 1
name_count = {key: value for key, value in name_count.iteritems() if value > 1}
return name_count
def biggest_root_name(self, dep_map):
# type: (Dict[str, List[str]]) -> str
"""
WARNING: Doesn't work currently. It's a TODO to fix this.
Finds the root with the most reachable nodes under it inside a DAG.
The biggest root is probably the app tree.
With this you don't have to pass in the root node name to self.read_dot_file(...)
"""
roots = self.find_roots_in_dep_map(dep_map)
root_name = None
if len(roots) == 1:
root_name = roots[0]
elif len(roots) == 0:
raise ValueError("Cyclic dependency graph given (len(roots) == 0), aborting")
else:
max_size = -1
for r in roots:
size = len(self.reachability_set(dep_map, r))
if size > max_size:
root_name, max_size = r, size
return root_name
@staticmethod
def mod_graph_from_dep_map(dep_map, root_node_name):
# type: (Dict[str, List[str]],str) -> (ModuleNode, List[ModuleNode])
"""
Converts an outgoing edge map (`dep_map`) into a ModuleNode graph
that you can generate a mock app from. You have to provide the
root node / application node name (`root_node_name`) for the graph.
"""
def make_mod(name):
return ModuleNode(name, ModuleNode.LIBRARY)
mod_map = {name: make_mod(name) for name in dep_map.keys()}
# TODO Fix mod_map[self.biggest_root(dep_map)]. If this works you don't
# have to pass the app node name any more.
# app_node = mod_map[self.biggest_root_name(dep_map)]
app_node = mod_map[root_node_name]
app_node.node_type = ModuleNode.APP
for name, deps in dep_map.iteritems():
new_deps = []
for dep_name in deps:
mod = mod_map.get(dep_name, None)
if not mod:
mod = make_mod(dep_name)
mod_map[dep_name] = mod
new_deps.append(mod)
mod_map[name].deps = new_deps
return app_node, mod_map.values()
@staticmethod
def write_struct(struct, path):
# type: (object, str) -> None
"""Writes generic python objects to disk (not deserializable)"""
with open(path, 'w') as f:
pprint(struct, f)
@staticmethod
def write_edges(edges, path):
# type: (List[List[str]], str) -> None
""""Writes edge lists to disk"""
with open(path, 'w') as f:
for e in edges:
f.write('"{}" -> "{}";\n'.format(e[0], e[1]))
def debug_dump(self, edges, structs):
# type: (List[List[List[str]]], List[object]) -> None
"""
Dumps various intermediate objects to files to help debugging.
`edges` will dump dot files of a list of edge pairs. Type: [[(String,String)]]
`structs` will dump a pretty print of python objects. Type: [PythonObject]
"""
dump_path = os.path.join(tempfile.gettempdir(), 'ub_dot_graph_dump')
logging.info('Dumping dotreader.py debug structures to %s', dump_path)
makedir(dump_path)
for i, edge in enumerate(edges):
path = os.path.join(dump_path, 'edges{}.gv'.format(i))
self.write_edges(edge, path)
for i, struct in enumerate(structs):
path = os.path.join(dump_path, 'struct{}.py'.format(i))
self.write_struct(struct, path)