def toml_write()

in detection_rules/rule_formatter.py [0:0]


def toml_write(rule_contents, outfile=None):
    """Write rule in TOML."""
    def write(text, nl=True):
        if outfile:
            outfile.write(text)
            if nl:
                outfile.write(u"\n")
        else:
            print(text, end='' if not nl else '\n')

    encoder = RuleTomlEncoder()
    contents = copy.deepcopy(rule_contents)
    needs_close = False

    def order_rule(obj):
        if isinstance(obj, dict):
            obj = OrderedDict(sorted(obj.items()))
            for k, v in obj.items():
                if isinstance(v, dict) or isinstance(v, list):
                    obj[k] = order_rule(v)

        if isinstance(obj, list):
            for i, v in enumerate(obj):
                if isinstance(v, dict) or isinstance(v, list):
                    obj[i] = order_rule(v)
            obj = sorted(obj, key=lambda x: json.dumps(x))

        return obj

    def _do_write(_data, _contents):
        query = None
        threat_query = None

        if _data == 'rule':
            # - We want to avoid the encoder for the query and instead use kql-lint.
            # - Linting is done in rule.normalize() which is also called in rule.validate().
            # - Until lint has tabbing, this is going to result in all queries being flattened with no wrapping,
            #     but will at least purge extraneous white space
            query = contents['rule'].pop('query', '').strip()

            # - As tags are expanding, we may want to reconsider the need to have them in alphabetical order
            # tags = contents['rule'].get("tags", [])
            #
            # if tags and isinstance(tags, list):
            #     contents['rule']["tags"] = list(sorted(set(tags)))
            threat_query = contents['rule'].pop('threat_query', '').strip()

        top = OrderedDict()
        bottom = OrderedDict()

        for k in sorted(list(_contents)):
            v = _contents.pop(k)

            if k == 'actions':
                # explicitly preserve formatting for message field in actions
                preserved_fields = ["params.message"]
                v = [preserve_formatting_for_fields(action, preserved_fields) for action in v] if v is not None else []

            if k == 'filters':
                # explicitly preserve formatting for value field in filters
                preserved_fields = ["meta.value"]
                v = [preserve_formatting_for_fields(meta, preserved_fields) for meta in v] if v is not None else []

            if k == 'note' and isinstance(v, str):
                # Transform instances of \ to \\ as calling write will convert \\ to \.
                # This will ensure that the output file has the correct number of backslashes.
                v = v.replace("\\", "\\\\")

            if k == 'setup' and isinstance(v, str):
                # Transform instances of \ to \\ as calling write will convert \\ to \.
                # This will ensure that the output file has the correct number of backslashes.
                v = v.replace("\\", "\\\\")

            if k == 'description' and isinstance(v, str):
                # Transform instances of \ to \\ as calling write will convert \\ to \.
                # This will ensure that the output file has the correct number of backslashes.
                v = v.replace("\\", "\\\\")

            if k == 'osquery' and isinstance(v, list):
                # Specifically handle transform.osquery queries
                for osquery_item in v:
                    if 'query' in osquery_item and isinstance(osquery_item['query'], str):
                        # Transform instances of \ to \\ as calling write will convert \\ to \.
                        # This will ensure that the output file has the correct number of backslashes.
                        osquery_item['query'] = osquery_item['query'].replace("\\", "\\\\")

            if isinstance(v, dict):
                bottom[k] = OrderedDict(sorted(v.items()))
            elif isinstance(v, list):
                if any([isinstance(value, (dict, list)) for value in v]):
                    bottom[k] = v
                else:
                    top[k] = v
            elif k in get_preserved_fmt_fields():
                top[k] = NonformattedField(v)
            else:
                top[k] = v

        if query:
            top.update({'query': "XXxXX"})

        if threat_query:
            top.update({'threat_query': "XXxXX"})

        top.update(bottom)
        top = toml.dumps(OrderedDict({data: top}), encoder=encoder)

        # we want to preserve the threat_query format, but want to modify it in the context of encoded dump
        if threat_query:
            formatted_threat_query = "\nthreat_query = '''\n{}\n'''{}".format(threat_query, '\n\n' if bottom else '')
            top = top.replace('threat_query = "XXxXX"', formatted_threat_query)

        # we want to preserve the query format, but want to modify it in the context of encoded dump
        if query:
            formatted_query = "\nquery = '''\n{}\n'''{}".format(query, '\n\n' if bottom else '')
            top = top.replace('query = "XXxXX"', formatted_query)

        write(top)

    try:

        if outfile and not isinstance(outfile, io.IOBase):
            needs_close = True
            outfile = open(outfile, 'w')

        for data in ('metadata', 'transform', 'rule'):
            _contents = contents.get(data, {})
            if not _contents:
                continue
            order_rule(_contents)
            _do_write(data, _contents)

    finally:
        if needs_close and hasattr(outfile, "close"):
            outfile.close()