def _to_dot()

in odps/df/backends/formatter.py [0:0]


    def _to_dot(self):
        buffer = six.StringIO()
        write = lambda x: buffer.write(to_text(x))
        write_newline = lambda x: write(x if x.endswith('\n') else x + '\n')
        write_indent_newline = lambda x, ind=1: write_newline(indent(x, 2 * ind))

        nid = itertools.count(1)

        write_newline('digraph DataFrameDAG {')
        write_indent_newline('START [shape=ellipse, label="start", style=filled, fillcolor=Pink];')

        nodes = self._dag.topological_sort()
        traversed = dict()
        for sid, node in izip(itertools.count(1), nodes):
            expr_node = node.expr
            traversed[id(node)] = sid

            pres = self._dag.predecessors(node)
            write_indent_newline('subgraph clusterSTAGE{0} {{'.format(sid))
            write_indent_newline('label = "Stage {0}"'.format(sid), ind=2)

            compiled = str(node._sql()) if hasattr(node, '_sql') else None

            for expr in traverse_until_source(expr_node, unique=True):
                if id(expr) not in traversed:
                    eid = next(nid)
                    traversed[id(expr)] = eid
                else:
                    eid = traversed[id(expr)]

                name_args = list(expr.iter_args())
                labels = [self._format_expr(expr), ]
                for i, name_arg in enumerate(name_args):
                    if name_arg[1] is None:
                        continue
                    labels.append('<f{0}>{1}'.format(i, name_arg[0].strip('_')))

                attr = ', style=filled, fillcolor=LightGrey' if isinstance(expr, CollectionExpr) else ''
                write_indent_newline(
                    'EXPR{0} [shape=record, label="{1}"{2}];'.format(eid, '|'.join(labels), attr), ind=2)

                no_child = True
                for i, name_arg in enumerate(name_args):
                    name, args = name_arg
                    if args is None:
                        continue

                    def get_arg(arg):
                        if id(arg) not in traversed:
                            arg_id = next(nid)
                            traversed[id(arg)] = arg_id
                        return 'EXPR{0} -> EXPR{1}:f{2};'.format(traversed[id(arg)], eid, i)
                    if isinstance(args, Iterable):
                        for arg in args:
                            write_indent_newline(get_arg(arg), ind=2)
                    else:
                        write_indent_newline(get_arg(args), ind=2)
                    no_child = False

                if no_child:
                    if len(pres) == 0:
                        if isinstance(expr, CollectionExpr):
                            write_indent_newline('START -> EXPR{0};'.format(eid), ind=2)
                    else:
                        for pre in pres:
                            pre_expr = pre.expr
                            pid = traversed[id(pre_expr)]
                            if (isinstance(pre_expr, Scalar) and isinstance(expr, Scalar)) or \
                                    (isinstance(pre_expr, CollectionExpr) and isinstance(expr, CollectionExpr)):
                                write_indent_newline('EXPR{0} -> EXPR{1};'.format(pid, eid), ind=2)

            if compiled:
                eid = traversed[id(expr_node)]
                compiled = '<TABLE ALIGN="LEFT" BORDER="0">%s</TABLE>' % ''.join(
                    '<TR><TD ALIGN="LEFT">%s</TD></TR>' % cgi.escape(l) for l in compiled.split('\n'))

                write_indent_newline(
                    'COMPILED{0} [shape=record, style="filled", fillcolor="SkyBlue", label=<\n'
                        .format(eid), ind=2)
                write_indent_newline(compiled, ind=3)
                write_indent_newline('>];', ind=2)
                write_indent_newline(
                    'EXPR{0} -> COMPILED{0} [arrowhead = none, style = dashed];'.format(eid), ind=2)

            write_indent_newline('}')

        write('}')

        return buffer.getvalue()