in detection_rules/rule_formatter.py [0:0]
def toml_write(rule_contents, outfile=None):
"""Write rule in TOML."""
def write(text, nl=True):
if outfile:
outfile.write(text)
if nl:
outfile.write(u"\n")
else:
print(text, end='' if not nl else '\n')
encoder = RuleTomlEncoder()
contents = copy.deepcopy(rule_contents)
needs_close = False
def order_rule(obj):
if isinstance(obj, dict):
obj = OrderedDict(sorted(obj.items()))
for k, v in obj.items():
if isinstance(v, dict) or isinstance(v, list):
obj[k] = order_rule(v)
if isinstance(obj, list):
for i, v in enumerate(obj):
if isinstance(v, dict) or isinstance(v, list):
obj[i] = order_rule(v)
obj = sorted(obj, key=lambda x: json.dumps(x))
return obj
def _do_write(_data, _contents):
query = None
threat_query = None
if _data == 'rule':
# - We want to avoid the encoder for the query and instead use kql-lint.
# - Linting is done in rule.normalize() which is also called in rule.validate().
# - Until lint has tabbing, this is going to result in all queries being flattened with no wrapping,
# but will at least purge extraneous white space
query = contents['rule'].pop('query', '').strip()
# - As tags are expanding, we may want to reconsider the need to have them in alphabetical order
# tags = contents['rule'].get("tags", [])
#
# if tags and isinstance(tags, list):
# contents['rule']["tags"] = list(sorted(set(tags)))
threat_query = contents['rule'].pop('threat_query', '').strip()
top = OrderedDict()
bottom = OrderedDict()
for k in sorted(list(_contents)):
v = _contents.pop(k)
if k == 'actions':
# explicitly preserve formatting for message field in actions
preserved_fields = ["params.message"]
v = [preserve_formatting_for_fields(action, preserved_fields) for action in v] if v is not None else []
if k == 'filters':
# explicitly preserve formatting for value field in filters
preserved_fields = ["meta.value"]
v = [preserve_formatting_for_fields(meta, preserved_fields) for meta in v] if v is not None else []
if k == 'note' and isinstance(v, str):
# Transform instances of \ to \\ as calling write will convert \\ to \.
# This will ensure that the output file has the correct number of backslashes.
v = v.replace("\\", "\\\\")
if k == 'setup' and isinstance(v, str):
# Transform instances of \ to \\ as calling write will convert \\ to \.
# This will ensure that the output file has the correct number of backslashes.
v = v.replace("\\", "\\\\")
if k == 'description' and isinstance(v, str):
# Transform instances of \ to \\ as calling write will convert \\ to \.
# This will ensure that the output file has the correct number of backslashes.
v = v.replace("\\", "\\\\")
if k == 'osquery' and isinstance(v, list):
# Specifically handle transform.osquery queries
for osquery_item in v:
if 'query' in osquery_item and isinstance(osquery_item['query'], str):
# Transform instances of \ to \\ as calling write will convert \\ to \.
# This will ensure that the output file has the correct number of backslashes.
osquery_item['query'] = osquery_item['query'].replace("\\", "\\\\")
if isinstance(v, dict):
bottom[k] = OrderedDict(sorted(v.items()))
elif isinstance(v, list):
if any([isinstance(value, (dict, list)) for value in v]):
bottom[k] = v
else:
top[k] = v
elif k in get_preserved_fmt_fields():
top[k] = NonformattedField(v)
else:
top[k] = v
if query:
top.update({'query': "XXxXX"})
if threat_query:
top.update({'threat_query': "XXxXX"})
top.update(bottom)
top = toml.dumps(OrderedDict({data: top}), encoder=encoder)
# we want to preserve the threat_query format, but want to modify it in the context of encoded dump
if threat_query:
formatted_threat_query = "\nthreat_query = '''\n{}\n'''{}".format(threat_query, '\n\n' if bottom else '')
top = top.replace('threat_query = "XXxXX"', formatted_threat_query)
# we want to preserve the query format, but want to modify it in the context of encoded dump
if query:
formatted_query = "\nquery = '''\n{}\n'''{}".format(query, '\n\n' if bottom else '')
top = top.replace('query = "XXxXX"', formatted_query)
write(top)
try:
if outfile and not isinstance(outfile, io.IOBase):
needs_close = True
outfile = open(outfile, 'w')
for data in ('metadata', 'transform', 'rule'):
_contents = contents.get(data, {})
if not _contents:
continue
order_rule(_contents)
_do_write(data, _contents)
finally:
if needs_close and hasattr(outfile, "close"):
outfile.close()