in geneve/utils/ast_dag.py [0:0]
def visit_ast(node, ctx, negate=False):
node_id = get_node_id(node.render())
if isinstance(node, eql.ast.Literal):
ctx.graph.node(node_id, node.render())
elif type(node) is eql.ast.Field:
ctx.graph.node(node_id, node.render())
elif type(node) is eql.ast.Or:
ctx.graph.node(node_id, "or")
for term in node.terms:
ctx.graph.attr("edge", color="black", style="solid")
with nullcontext() if negate else new_color(ctx, "edge"):
term_id = visit_ast(term, ctx, negate)
ctx.graph.edge(node_id, term_id)
elif type(node) is eql.ast.And:
ctx.graph.node(node_id, "and")
for term in node.terms:
ctx.graph.attr("edge", color="black", style="solid")
with new_color(ctx, "edge") if negate else nullcontext():
term_id = visit_ast(term, ctx, negate)
ctx.graph.edge(node_id, term_id)
elif type(node) is eql.ast.Not:
ctx.graph.node(node_id, "not")
term_id = visit_ast(node.term, ctx, not negate)
ctx.graph.edge(node_id, term_id)
elif type(node) is eql.ast.IsNull:
null_id = get_node_id("null")
ctx.graph.node(node_id, "==")
expr_id = visit_ast(node.expr, ctx, negate)
ctx.graph.node(null_id, "null")
ctx.graph.edge(node_id, expr_id)
ctx.graph.edge(node_id, null_id)
elif type(node) is eql.ast.IsNotNull:
null_id = get_node_id("null")
ctx.graph.node(node_id, "!=")
expr_id = visit_ast(node.expr, ctx, negate)
ctx.graph.node(null_id, "null")
ctx.graph.edge(node_id, expr_id)
ctx.graph.edge(node_id, null_id)
elif type(node) is eql.ast.InSet:
ctx.graph.node(node_id, "in")
ctx.graph.attr("edge", color="black", style="solid")
expr_id = visit_ast(node.expression, ctx, negate)
ctx.graph.edge(node_id, expr_id)
for term in node.container:
with nullcontext() if negate else new_color(ctx, "edge"):
term_id = visit_ast(term, ctx, negate)
ctx.graph.edge(node_id, term_id)
elif type(node) is eql.ast.Comparison:
ctx.graph.node(node_id, node.comparator)
left_id = visit_ast(node.left, ctx, negate)
right_id = visit_ast(node.right, ctx, negate)
ctx.graph.edge(node_id, left_id)
ctx.graph.edge(node_id, right_id)
elif type(node) is eql.ast.EventQuery:
visit_ast(node.query, ctx, negate)
elif type(node) is eql.ast.PipedQuery:
visit_ast(node.first, ctx, negate)
elif type(node) is eql.ast.FunctionCall:
ctx.graph.node(node_id, node.name.lower())
ctx.graph.attr("edge", color="black", style="solid")
arg_id = visit_ast(node.arguments[0], ctx, negate)
ctx.graph.edge(node_id, arg_id)
for arg in node.arguments[1:]:
with nullcontext() if negate else new_color(ctx, "edge"):
arg_id = visit_ast(arg, ctx, negate)
ctx.graph.edge(node_id, arg_id)
else:
raise ValueError(f"Unable to draw node type: {type(node)}")
return node_id