in lib/graph.py [0:0]
def __str__(self):
buf = ["digraph G {"]
nodes = set()
edges = set()
for job in self.iter_jobs():
args = job["wrapper_arguments"]
cmd_node = CommandNode(
args["pipeline_name"], args["description"], args["command"])
nodes.add(cmd_node)
if args["outputs"]:
for output in args["outputs"]:
out_node = DependencyNode(output)
nodes.add(out_node)
edges.add(Edge(src=cmd_node, dst=out_node))
if args["inputs"]:
for inputt in args["inputs"]:
in_node = DependencyNode(inputt)
nodes.add(in_node)
edges.add(Edge(src=in_node, dst=cmd_node))
buf.extend([(" %s" % str(n)) for n in nodes])
buf.extend([(" %s" % str(e)) for e in edges])
buf.append("}")
return "\n".join(buf)