tutorials-and-examples/example-notebooks/SigmaRuleImporter.ipynb (686 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Import and convert Neo23x0 Sigma scripts\n", "ianhelle@microsoft.com\n", "\n", "This notebook is a is a quick and dirty Sigma to Log Analytics converter.\n", "It uses the modules from sigmac package to do the conversion.\n", "\n", "Only a subset of the Sigma rules are convertible currently. Failure to convert\n", "could be for one or more of these reasons:\n", "- known limitations of the converter\n", "- mismatch between the syntax expressible in Sigma and KQL\n", "- data sources referenced in Sigma rules do not yet exist in Microsoft Sentinel\n", "\n", "The sigmac tool is downloadable as a package from PyPi but since we are downloading\n", "the rules from the repo, we also copy and import the package from the repo source.\n", "\n", "After conversion you can use an interactive browser to step through the rules and\n", "view (and copy/save) the KQL equivalents. You can also take the conversion results and \n", "use them in another way (e.g.bulk save to files).\n", "\n", "The notebook is all somewhat experimental and offered as-is without any guarantees" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Download and unzip the Sigma repo" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "gather": { "logged": 1617996628345 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "from pathlib import Path\n", "from IPython.display import display, HTML\n", "\n", "REQ_PYTHON_VER = \"3.6\"\n", "REQ_MSTICPY_VER = \"1.0.0\"\n", "\n", "display(HTML(\"<h3>Starting Notebook setup...</h3>\"))\n", "\n", "# If not using Azure Notebooks, install msticpy with\n", "# %pip install msticpy\n", "\n", "from msticpy.nbtools import nbinit\n", "nbinit.init_notebook(namespace=globals());" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1617996638417 } }, "outputs": [], "source": [ "import requests\n", "# Download the repo ZIP\n", "sigma_git_url = 'https://github.com/Neo23x0/sigma/archive/master.zip'\n", "r = requests.get(sigma_git_url)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1617996641107 } }, "outputs": [], "source": [ "from ipywidgets import widgets, Layout\n", "import os\n", "from pathlib import Path\n", "def_path = Path.joinpath(Path(os.getcwd()), \"sigma\")\n", "path_wgt = widgets.Text(value=str(def_path), \n", " description='Path to extract to zipped repo files: ', \n", " layout=Layout(width='50%'),\n", " style={'description_width': 'initial'})\n", "path_wgt" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "gather": { "logged": 1617996647717 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "RULES_REL_PATH = 'sigma-master/rules'\n", "rules_root = Path(path_wgt.value) / RULES_REL_PATH" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "Note that this can take some time to complete" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1617996779748 } }, "outputs": [], "source": [ "import zipfile\n", "import io\n", "repo_zip = io.BytesIO(r.content)\n", "\n", "zip_archive = zipfile.ZipFile(repo_zip, mode='r')\n", "zip_archive.extractall(path=path_wgt.value)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Check that we have the files\n", "You should see a folder with folders such as application, apt, windows..." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "%ls {rules_root}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Convert Sigma Files to Log Analytics Kql queries" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1617996791966 }, "scrolled": false }, "outputs": [], "source": [ "# Read the Sigma YAML file paths into a dict and make a\n", "# a copy for the target Kql queries\n", "from pathlib import Path\n", "from collections import defaultdict\n", "import copy\n", "\n", "def get_rule_files(rules_root):\n", " file_dict = defaultdict(dict)\n", " for file in Path(rules_root).resolve().rglob(\"*.yml\"):\n", " rel_path = Path(file).relative_to(rules_root)\n", " path_key = '.'.join(rel_path.parent.parts)\n", " file_dict[path_key][rel_path.name] = file\n", " return file_dict\n", " \n", "sigma_dict = get_rule_files(rules_root)\n", "kql_dict = copy.deepcopy(sigma_dict)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1617996797597 } }, "outputs": [], "source": [ "# Add downloaded sigmac tool to sys.path and import Sigmac functions\n", "import os\n", "import sys\n", "module_path = os.path.abspath(os.path.join('sigma/sigma-master/tools'))\n", "if module_path not in sys.path:\n", " sys.path.append(module_path)\n", "from sigma.parser.collection import SigmaCollectionParser\n", "from sigma.parser.exceptions import SigmaCollectionParseError, SigmaParseError\n", "from sigma.configuration import SigmaConfiguration, SigmaConfigurationChain\n", "from sigma.config.exceptions import SigmaConfigParseError, SigmaRuleFilterParseException\n", "from sigma.filter import SigmaRuleFilter\n", "import sigma.backends.discovery as backends\n", "from sigma.backends.base import BackendOptions\n", "from sigma.backends.exceptions import BackendError, NotSupportedError, PartialMatchError, FullMatchError" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "gather": { "logged": 1618001212174 }, "scrolled": false }, "outputs": [], "source": [ "# Sigma to Log Analytics Conversion\n", "import yaml\n", "_LA_MAPPINGS = '''\n", "fieldmappings:\n", " Image: NewProcessName\n", " ParentImage: ParentProcessName\n", " ParentCommandLine: NO_MAPPING\n", "'''\n", "\n", "NOT_CONVERTIBLE = 'Not convertible'\n", "\n", "def sigma_to_la(file_path):\n", " with open(file_path, 'r') as input_file:\n", " try:\n", " sigmaconfigs = SigmaConfigurationChain()\n", " sigmaconfig = SigmaConfiguration(_LA_MAPPINGS)\n", " sigmaconfigs.append(sigmaconfig)\n", " backend_options = BackendOptions(None, None)\n", " backend = backends.getBackend('ala')(sigmaconfigs, backend_options)\n", " parser = SigmaCollectionParser(input_file, sigmaconfigs, None)\n", " results = parser.generate(backend)\n", " kql_result = ''\n", " for result in results:\n", " kql_result += result\n", " except (NotImplementedError, NotSupportedError, TypeError):\n", " kql_result = NOT_CONVERTIBLE\n", " input_file.seek(0,0)\n", " sigma_txt = input_file.read()\n", " if not kql_result == NOT_CONVERTIBLE:\n", " try:\n", " kql_header = \"\\n\".join(get_sigma_properties(sigma_txt))\n", " kql_result = kql_header + \"\\n\" + kql_result\n", " except Exception as e:\n", " print(\"exception reading sigma YAML: \", e)\n", " print(sigma_txt, kql_result, sep='\\n')\n", " return sigma_txt, kql_result\n", "\n", "sigma_keys = ['title', 'description', 'tags', 'status', \n", " 'author', 'logsource', 'falsepositives', 'level']\n", "\n", "def get_sigma_properties(sigma_rule):\n", " sigma_docs = yaml.load_all(sigma_rule, Loader=yaml.SafeLoader)\n", " sigma_rule_dict = next(sigma_docs)\n", " for prop in sigma_keys:\n", " yield get_property(prop, sigma_rule_dict)\n", "\n", "def get_property(name, sigma_rule_dict):\n", " sig_prop = sigma_rule_dict.get(name, 'na')\n", " if isinstance(sig_prop, dict):\n", " sig_prop = ' '.join([f\"{k}: {v}\" for k, v in sig_prop.items()])\n", " return f\"// {name}: {sig_prop}\"\n", " \n", " \n", "_KQL_FILTERS = {\n", " 'date': ' | where TimeGenerated >= datetime({start}) and TimeGenerated <= datetime({end}) ',\n", " 'host': ' | where Computer has {host_name} '\n", "}\n", "\n", "def insert_at(source, insert, find_sub):\n", " pos = source.find(find_sub)\n", " if pos != -1:\n", " return source[:pos] + insert + source[pos:]\n", " else:\n", " return source + insert\n", " \n", "def add_filter_clauses(source, **kwargs):\n", " if \"{\" in source or \"}\" in source:\n", " source = (\"// Warning: embedded braces in source. Please edit if necessary.\\n\"\n", " + source)\n", " source = source.replace('{', '{{').replace('}', '}}')\n", " if kwargs.get('host', False):\n", " source = insert_at(source, _KQL_FILTERS['host'], '|')\n", " if kwargs.get('date', False):\n", " source = insert_at(source, _KQL_FILTERS['date'], '|')\n", " return source\n", "\n", "\n", "# Run the conversion\n", "print(\"Converting rules\")\n", "conv_counter = {}\n", "for categ, sources in sigma_dict.items():\n", " src_converted = 0\n", " print(\"\\n\", categ, end=\"\")\n", " for file_name, file_path in sources.items():\n", " try:\n", " sigma, kql = sigma_to_la(file_path)\n", " print(\".\", end=\"\")\n", " except:\n", " print(f\"Error converting {file_name} ({file_path})\")\n", " continue\n", " kql_dict[categ][file_name] = (sigma, kql)\n", " if not kql == NOT_CONVERTIBLE:\n", " src_converted += 1\n", " conv_counter[categ] = (len(sources), src_converted)\n", "\n", "print(\"\\nConversion statistics\")\n", "print(\"-\" * len(\"Conversion statistics\"))\n", "print('\\n'.join([f'{categ}: rules: {counter[0]}, converted: {counter[1]}'\n", " for categ, counter in conv_counter.items()]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Display the results in an interactive browser\n", "\n", "> Note: in order to execute a query from the browser, run\n", "> the cells in the \"Execute query\" section first. Then\n", "> come back to the browser." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": false, "gather": { "logged": 1618001212717 }, "scrolled": false }, "outputs": [], "source": [ "from ipywidgets import widgets, Layout\n", "\n", "# Browser Functions\n", "def on_cat_value_change(change):\n", " queries_w.options = kql_dict[change['new']].keys()\n", " queries_w.value = queries_w.options[0]\n", "\n", "def on_query_value_change(change):\n", " if view_qry_check.value:\n", " qry_text = kql_dict[sub_cats_w.value][queries_w.value][1]\n", " if \"Not convertible\" not in qry_text:\n", " qry_text = add_filter_clauses(qry_text,\n", " date=add_date_filter_check.value,\n", " host=add_host_filter_check.value)\n", " query_text_w.value = qry_text.replace('|', '\\n|')\n", " orig_text_w.value = kql_dict[sub_cats_w.value][queries_w.value][0]\n", "\n", "def on_view_query_value_change(change):\n", " vis = 'visible' if view_qry_check.value else 'hidden'\n", " on_query_value_change(None)\n", " query_text_w.layout.visibility = vis\n", " orig_text_w.layout.visibility = vis\n", "\n", "# Function defs for ExecuteQuery cell below\n", "def click_exec_hqry(b):\n", " global qry_results\n", " query_name = queries_w.value\n", " query_cat = sub_cats_w.value\n", " query_text = query_text_w.value\n", " query_text = query_text.format(**qry_wgt.query_params)\n", " disp_results(query_text)\n", "\n", "disp_result = None\n", "\n", "\n", "def disp_results(query_text):\n", " if disp_result is None:\n", " print(\n", " \"Cannot run query without authenticating.\",\n", " \"Please run subsequent cells first\"\n", " )\n", " return\n", " disp_result.update(\"Running query...\")\n", " qry_results = execute_kql_query(query_text)\n", " disp_result.update(qry_results)\n", " \n", "exec_hqry_button = widgets.Button(description=\"Execute query..\")\n", "\n", "exec_hqry_button.on_click(click_exec_hqry)\n", "\n", "# Browser widget setup\n", "categories = list(sorted(kql_dict.keys()))\n", "sub_cats_w = widgets.Select(options=categories, \n", " description='Category : ',\n", " layout=Layout(width='30%', height='120px'),\n", " style = {'description_width': 'initial'})\n", "\n", "queries_w = widgets.Select(options = kql_dict[categories[0]].keys(),\n", " description='Query : ',\n", " layout=Layout(width='30%', height='120px'),\n", " style = {'description_width': 'initial'})\n", "\n", "query_text_w = widgets.Textarea(\n", " value='',\n", " description='Kql Query:',\n", " layout=Layout(width='100%', height='300px', visiblity='hidden'),\n", " disabled=False)\n", "orig_text_w = widgets.Textarea(\n", " value='',\n", " description='Sigma Query:',\n", " layout=Layout(width='100%', height='250px', visiblity='hidden'),\n", " disabled=False)\n", "\n", "query_text_w.layout.visibility = 'hidden'\n", "orig_text_w.layout.visibility = 'hidden'\n", "sub_cats_w.observe(on_cat_value_change, names='value')\n", "queries_w.observe(on_query_value_change, names='value')\n", "\n", "view_qry_check = widgets.Checkbox(description=\"View query\", value=True)\n", "add_date_filter_check = widgets.Checkbox(description=\"Add date filter\", value=False)\n", "add_host_filter_check = widgets.Checkbox(description=\"Add host filter\", value=False)\n", "\n", "view_qry_check.observe(on_view_query_value_change, names='value')\n", "add_date_filter_check.observe(on_view_query_value_change, names='value')\n", "add_host_filter_check.observe(on_view_query_value_change, names='value')\n", "# view_qry_button.on_click(click_exec_hqry)\n", "# display(exec_hqry_button);\n", "\n", "vbox_opts = widgets.VBox([view_qry_check, add_date_filter_check, add_host_filter_check])\n", "hbox = widgets.HBox([sub_cats_w, queries_w, vbox_opts])\n", "vbox = widgets.VBox([hbox, orig_text_w, query_text_w])\n", "on_view_query_value_change(None)\n", "display(vbox)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Click the `Execute query` button below to run the currently displayed query\n", "**Notes:**\n", "- To run the queries, first authenticate to Microsoft Sentinel\n", "- If you added a date filter to the query set the date range below in the control below" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Authenticate to Microsoft Sentinel and Set Query Time bounds" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "collapsed": true, "gather": { "logged": 1618006646182 }, "jupyter": { "outputs_hidden": false, "source_hidden": false }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "from msticpy.nbtools.nbwidgets import QueryTime\n", "from IPython.display import display\n", "from msticpy.data import QueryProvider\n", "from msticpy.common.wsconfig import WorkspaceConfig\n", "\n", "ws_config = WorkspaceConfig()\n", "qry_prov = QueryProvider(\"LogAnalytics\")\n", "qry_prov.connect(ws_config.code_connect_str)\n", "\n", "def clean_kql_comments(query_string):\n", " \"\"\"Cleans\"\"\"\n", " import re\n", " return re.sub(r'(//[^\\n]+)', '', query_string, re.MULTILINE).replace('\\n', '').strip()\n", "\n", "def execute_kql_query(query_string):\n", " if not query_string or len(query_string.strip()) == 0:\n", " print('No query supplied')\n", " return None\n", " src_query = clean_kql_comments(query_string)\n", " src_query = src_query.format(start=qry_wgt.start, end=qry_wgt.end)\n", " result = qry_prov.exec_query(src_query)\n", " \n", " return result\n", "\n", "disp_result = display(display_id=True)\n", "\n", "def exec_query_btn(btn):\n", " query = query_text_w.value\n", " result = execute_kql_query(query)\n", " disp_result.update(result)\n", "\n", "exec_hqry_button = widgets.Button(description=\"Execute Query\")\n", "exec_hqry_button.on_click(exec_query_btn)\n", "\n", "qry_wgt = QueryTime(units='days', before=5, after=0, max_before=30, max_after=10)\n", "\n", "display(qry_wgt)" ] }, { "cell_type": "markdown", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "### Execute the Query" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "display(exec_hqry_button)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Save All Converted Files" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "path_save_wgt = widgets.Text(value=str(def_path) + \"_kql_out\",\n", " description='Path to save KQL files: ',\n", " layout=Layout(width='50%'),\n", " style={'description_width': 'initial'})\n", "path_save_wgt" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "root = Path(path_save_wgt.value)\n", "root.mkdir(exist_ok=True)\n", "for categ, kql_files in kql_dict.items():\n", " sub_dir = root.joinpath(categ)\n", " \n", " for file_name, contents in kql_files.items():\n", " kql_txt = contents[1]\n", " if not kql_txt == NOT_CONVERTIBLE:\n", " sub_dir.mkdir(exist_ok=True)\n", " file_path = sub_dir.joinpath(file_name.replace('.yml', '.kql'))\n", " with open(file_path, 'w') as output_file:\n", " output_file.write(kql_txt)\n", " print(f\"Saved {file_path}\")\n" ] } ], "metadata": { "hide_input": false, "kernel_info": { "name": "python38-azureml" }, "kernelspec": { "display_name": "Python 3.8 - AzureML", "language": "python", "name": "python38-azureml" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.1" }, "nteract": { "version": "nteract-front-end@1.0.0" }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": false, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": {}, "toc_section_display": true, "toc_window_display": false }, "varInspector": { "cols": { "lenName": 16, "lenType": 16, "lenVar": 40 }, "kernels_config": { "python": { "delete_cmd_postfix": "", "delete_cmd_prefix": "del ", "library": "var_list.py", "varRefreshCmd": "print(var_dic_list())" }, "r": { "delete_cmd_postfix": ") ", "delete_cmd_prefix": "rm(", "library": "var_list.r", "varRefreshCmd": "cat(var_dic_list()) " } }, "types_to_exclude": [ "module", "function", "builtin_function_or_method", "instance", "_Feature" ], "window_display": false } }, "nbformat": 4, "nbformat_minor": 2 }