tools/stix-to-ecs/extra/clean_stix.py (54 lines of code) (raw):

# coding: utf-8 import argparse import pathlib import json import typing OPENCTI = "x_opencti_" OLD_TLP = "clear" NEW_TLP = "white" def cleaning_callback(node: typing.Any) -> typing.Any: """ The function clean (or correct) a Json node if possible according to the following rules: - "x_open_cti_*" -> "x_*". - "white" -> "clear". :param: The Json node to be processed. :return: The Json node post processing. """ if type(node) is dict: for k in list(node.keys()): if k.startswith(OPENCTI): node[k.replace(OPENCTI, "x_")] = node.pop(k) if k == "name" or k == "tlp": if (tmp := node[k].lower()).endswith(OLD_TLP): tmp = tmp.replace(OLD_TLP, NEW_TLP) node[k] = tmp.upper() if node[k].isupper() else tmp return node def clean_stix(stix: str) -> str: """ The function clean a string containing STIX data. :param stix: STIX data to be cleaned. :return: Cleaned STIX data """ return json.dumps(visit_json(json.loads(stix), cleaning_callback)) def main() -> None: args = parse_arguments() with args.input.open("r") as i: with args.output.open("w") as o: o.write(clean_stix(i.read())) def parse_arguments() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Clean a STIX document according to a set of hardcoded rules." ) parser.add_argument( "-i", "--input", type=pathlib.Path, required=True, help="Input file path" ) parser.add_argument( "-o", "--output", type=pathlib.Path, required=True, help="Output file path" ) return parser.parse_args() def visit_json( root_node: dict[str, typing.Any], callback: typing.Callable ) -> dict[str, typing.Any]: """ The function visit a Json tree and apply the provided callback on each node. :param root_node: The Json root node. :param callback: The callback to be applyed on each node of the tree. :return: The Json root node """ return visit_json_aux(root_node, callback) def visit_json_aux( node: typing.Any, callback: typing.Callable, ) -> typing.Any: """ The function apply the callback on the current node and recursively call this function on node's children. :param node: The current Json node. :param callback: The callback to be applyed on the current node. :return: The current node """ node = callback(node) t = type(node) if t is dict: for k, v in node.items(): node[k] = visit_json_aux(v, callback) elif t is list: for i, x in enumerate(node): node[i] = visit_json_aux(x, callback) return node if __name__ == "__main__": main()