in odps/df/backends/formatter.py [0:0]
def _to_dot(self):
buffer = six.StringIO()
write = lambda x: buffer.write(to_text(x))
write_newline = lambda x: write(x if x.endswith('\n') else x + '\n')
write_indent_newline = lambda x, ind=1: write_newline(indent(x, 2 * ind))
nid = itertools.count(1)
write_newline('digraph DataFrameDAG {')
write_indent_newline('START [shape=ellipse, label="start", style=filled, fillcolor=Pink];')
nodes = self._dag.topological_sort()
traversed = dict()
for sid, node in izip(itertools.count(1), nodes):
expr_node = node.expr
traversed[id(node)] = sid
pres = self._dag.predecessors(node)
write_indent_newline('subgraph clusterSTAGE{0} {{'.format(sid))
write_indent_newline('label = "Stage {0}"'.format(sid), ind=2)
compiled = str(node._sql()) if hasattr(node, '_sql') else None
for expr in traverse_until_source(expr_node, unique=True):
if id(expr) not in traversed:
eid = next(nid)
traversed[id(expr)] = eid
else:
eid = traversed[id(expr)]
name_args = list(expr.iter_args())
labels = [self._format_expr(expr), ]
for i, name_arg in enumerate(name_args):
if name_arg[1] is None:
continue
labels.append('<f{0}>{1}'.format(i, name_arg[0].strip('_')))
attr = ', style=filled, fillcolor=LightGrey' if isinstance(expr, CollectionExpr) else ''
write_indent_newline(
'EXPR{0} [shape=record, label="{1}"{2}];'.format(eid, '|'.join(labels), attr), ind=2)
no_child = True
for i, name_arg in enumerate(name_args):
name, args = name_arg
if args is None:
continue
def get_arg(arg):
if id(arg) not in traversed:
arg_id = next(nid)
traversed[id(arg)] = arg_id
return 'EXPR{0} -> EXPR{1}:f{2};'.format(traversed[id(arg)], eid, i)
if isinstance(args, Iterable):
for arg in args:
write_indent_newline(get_arg(arg), ind=2)
else:
write_indent_newline(get_arg(args), ind=2)
no_child = False
if no_child:
if len(pres) == 0:
if isinstance(expr, CollectionExpr):
write_indent_newline('START -> EXPR{0};'.format(eid), ind=2)
else:
for pre in pres:
pre_expr = pre.expr
pid = traversed[id(pre_expr)]
if (isinstance(pre_expr, Scalar) and isinstance(expr, Scalar)) or \
(isinstance(pre_expr, CollectionExpr) and isinstance(expr, CollectionExpr)):
write_indent_newline('EXPR{0} -> EXPR{1};'.format(pid, eid), ind=2)
if compiled:
eid = traversed[id(expr_node)]
compiled = '<TABLE ALIGN="LEFT" BORDER="0">%s</TABLE>' % ''.join(
'<TR><TD ALIGN="LEFT">%s</TD></TR>' % cgi.escape(l) for l in compiled.split('\n'))
write_indent_newline(
'COMPILED{0} [shape=record, style="filled", fillcolor="SkyBlue", label=<\n'
.format(eid), ind=2)
write_indent_newline(compiled, ind=3)
write_indent_newline('>];', ind=2)
write_indent_newline(
'EXPR{0} -> COMPILED{0} [arrowhead = none, style = dashed];'.format(eid), ind=2)
write_indent_newline('}')
write('}')
return buffer.getvalue()