in lib/graph.py [0:0]
def build(self):
for job in self.iter_jobs():
args = job["wrapper_arguments"]
cmd_node = self._make_cmd_node(
job["complete"], bool(job.get("start_time")), job.get("outcome",
None), args["pipeline_name"], args["description"],
args["command"])
self.nodes.add(cmd_node)
for inputt in args.get("inputs") or []:
in_node = lib.graph.DependencyNode(inputt)
self.nodes.add(in_node)
self.edges.add(lib.graph.Edge(src=in_node, dst=cmd_node))
for output in args.get("outputs") or []:
out_node = lib.graph.DependencyNode(output)
self.nodes.add(out_node)
self.edges.add(lib.graph.Edge(src=cmd_node, dst=out_node))