in detection_rules/beats.py [0:0]
def get_datasets_and_modules(tree: Union[eql.ast.BaseNode, kql.ast.BaseNode]) -> tuple:
"""Get datasets and modules from an EQL or KQL AST."""
modules = set()
datasets = set()
# extract out event.module and event.dataset from the query's AST
for node in tree:
if isinstance(node, eql.ast.Comparison) and node.comparator == node.EQ and \
isinstance(node.right, eql.ast.String):
if node.left == eql.ast.Field("event", ["module"]):
modules.add(node.right.render())
elif node.left == eql.ast.Field("event", ["dataset"]):
datasets.add(node.right.render())
elif isinstance(node, eql.ast.InSet):
if node.expression == eql.ast.Field("event", ["module"]):
modules.add(node.get_literals())
elif node.expression == eql.ast.Field("event", ["dataset"]):
datasets.add(node.get_literals())
elif isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.module"):
modules.update(child.value for child in node.value if isinstance(child, kql.ast.String))
elif isinstance(node, kql.ast.FieldComparison) and node.field == kql.ast.Field("event.dataset"):
datasets.update(child.value for child in node.value if isinstance(child, kql.ast.String))
return datasets, modules