scenario-notebooks/Hunting-Notebooks/AutomateTools_ParquetGenerator.ipynb (369 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "source": [ "# Automate Tools - Parquet Files Generator\n", "\n", "__Notebook Version:__ 1.0<br>\n", "__Python Version:__ Python 3.8<br>\n", "__Apache Spark Version:__ 3.1<br>\n", "__Required Packages:__ azure-monitor-query, azure-mgmt-loganalytics<br>\n", "__Platforms Supported:__ Azure Synapse Analytics\n", " \n", "### Description\n", "\n", "## Table of Contents\n", "1. Warm-up\n", "2. Azure Log Analytics Data Queries\n", "3. Save result to Azure Log Analytics Custom Table" ], "metadata": {} }, { "cell_type": "markdown", "source": [ "## 1. Warm-up" ], "metadata": {} }, { "cell_type": "code", "source": [ "# Load Python libraries that will be used in this notebook\n", "from azure.mgmt.loganalytics import LogAnalyticsManagementClient\n", "from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus\n", "from azure.monitor.ingestion import LogsIngestionClient\n", "from azure.identity import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential\n", "from azure.core.exceptions import HttpResponseError \n", "\n", "import sys\n", "from datetime import datetime, timezone, timedelta\n", "import requests\n", "import pandas as pd\n", "import numpy\n", "import json\n", "import math\n", "import ipywidgets\n", "from IPython.display import display, HTML, Markdown" ], "outputs": [], "execution_count": null, "metadata": {} }, { "cell_type": "code", "source": [ "# User input for Log Analytics workspace as the data source for querying\r\n", "subscription_id_source = \"\"\r\n", "resource_group_name_source = \"\"\r\n", "workspace_name_source = \"\"\r\n", "workspace_id_source = \"\"\r\n", "workspace_resource_id_source = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id_source, resource_group_name_source, workspace_name_source)\r\n" ], "outputs": [], "execution_count": null, "metadata": { "jupyter": { "source_hidden": false, "outputs_hidden": false }, "nteract": { "transient": { "deleting": false } } } }, { "cell_type": "code", "source": [ "# User input for Log Analytics workspace for data ingestion\r\n", "tenant_id = \"\"\r\n", "subscription_id = \"\"\r\n", "workspace_id = \"\"\r\n", "resource_group_name = \"\"\r\n", "location = \"\"\r\n", "workspace_name = ''\r\n", "workspace_resource_id = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id, resource_group_name, workspace_name)\r\n", "data_collection_endpoint_name = \"\"\r\n", "data_collection_rule_name = \"\"\r\n", "custom_table_name = \"\"\r\n", "stream_name = \"Custom-\" + custom_table_name\r\n", "immutable_rule_id = \"\"\r\n", "dce_endpoint = \"\"\r\n", "\r\n", "akv_name = \"\"\r\n", "client_id_name = \"\"\r\n", "client_secret_name = \"\"\r\n", "akv_link_name = \"\"" ], "outputs": [], "execution_count": null, "metadata": { "jupyter": { "source_hidden": false, "outputs_hidden": false }, "nteract": { "transient": { "deleting": false } } } }, { "cell_type": "code", "source": [ "# You may need to change resource_uri for various cloud environments.\r\n", "resource_uri = \"https://api.loganalytics.io\"\r\n", "client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)\r\n", "client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)\r\n", "\r\n", "credential = ClientSecretCredential(\r\n", " tenant_id=tenant_id, \r\n", " client_id=client_id, \r\n", " client_secret=client_secret)\r\n", "access_token = credential.get_token(resource_uri + \"/.default\")\r\n", "token = access_token[0]" ], "outputs": [], "execution_count": null, "metadata": {} }, { "cell_type": "markdown", "source": [ "## 2. Azure Log Analytics Data Queries" ], "metadata": { "nteract": { "transient": { "deleting": false } } } }, { "cell_type": "code", "source": [ "# Functions for query\r\n", "def query_la(workspace_id_query, query):\r\n", " la_data_client = LogsQueryClient(credential=credential)\r\n", " end_time = datetime.now(timezone.utc)\r\n", " start_time = end_time - timedelta(15)\r\n", "\r\n", " query_result = la_data_client.query_workspace(\r\n", " workspace_id=workspace_id_query,\r\n", " query=query,\r\n", " timespan=(start_time, end_time))\r\n", " \r\n", " df_la_query = pd.DataFrame\r\n", "\r\n", " if query_result.status == LogsQueryStatus.SUCCESS:\r\n", " if hasattr(query_result, 'tables'):\r\n", " data = query_result.tables\r\n", " if len(query_result.tables) > 1:\r\n", " print('You have more than one tyable to processs')\r\n", " elif query_result.status == LogsQueryStatus.PARTIAL:\r\n", " data=query_result.partial_data\r\n", " print(query_result.partial_error)\r\n", " else:\r\n", " print(query_result.error)\r\n", " \r\n", " if len(query_result.tables) > 1:\r\n", " print('You have more than one tyable to processs')\r\n", " for table in data:\r\n", " df_la_query = pd.DataFrame(data=table.rows, columns=table.columns)\r\n", " return df_la_query\r\n", "\r\n", "def slice_query_la(query, lookback_start, lookback_end='0', lookback_unit='h', query_row_limit=400000, split_factor=2):\r\n", " \"Slice the time to render records <= 500K\"\r\n", " count_query = query.format(lookback_start, lookback_unit, lookback_end)\r\n", " count = ' | summarize count()'\r\n", " count_query = count_query + count\r\n", " df_count = query_la(workspace_id_source, count_query)\r\n", " row_count = df_count['count_'][0]\r\n", " print(count_query)\r\n", " print(row_count)\r\n", " df_final = pd.DataFrame()\r\n", "\r\n", " if row_count > query_row_limit:\r\n", " number_of_divide = 0\r\n", " while row_count > query_row_limit:\r\n", " row_count = row_count / split_factor\r\n", " number_of_divide = number_of_divide + 1\r\n", "\r\n", " factor = split_factor ** number_of_divide\r\n", " step_number = math.ceil(int(lookback_start) / factor)\r\n", " if factor > int(lookback_start) and lookback_unit == 'h':\r\n", " lookback_unit = 'm'\r\n", " number_of_minutes = 60\r\n", " step_number = math.ceil(int(lookback_start)*number_of_minutes / factor)\r\n", "\r\n", " try:\r\n", " for i in range(int(lookback_end), factor + 1, 1):\r\n", " if i > 0:\r\n", " df_la_query = pd.DataFrame\r\n", " current_query = query.format(i * step_number, lookback_unit, (i - 1) * step_number)\r\n", " print(current_query)\r\n", " df_la_query = query_la(workspace_id_source, current_query)\r\n", " print(df_la_query.shape[0])\r\n", " df_final = pd.concat([df_final, df_la_query])\r\n", " except:\r\n", " print(\"query failed\")\r\n", " raise\r\n", " else:\r\n", " df_final = query_la(workspace_id_source, query.format(lookback_start, lookback_unit, lookback_end))\r\n", "\r\n", " return df_final" ], "outputs": [], "execution_count": null, "metadata": { "jupyter": { "source_hidden": false, "outputs_hidden": false }, "nteract": { "transient": { "deleting": false } } } }, { "cell_type": "markdown", "source": [ "### Slice data for query" ], "metadata": { "nteract": { "transient": { "deleting": false } } } }, { "cell_type": "code", "source": [ "# Use Dror's test LA table\r\n", "query_template = \"let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}{1}) and TimeGenerated <= ago({2}{1}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip\"\r\n", "lookback_start = '4'\r\n", "\r\n", "df_final = slice_query_la(query_template, lookback_start)\r\n", "print(df_final.shape[0])" ], "outputs": [], "execution_count": null, "metadata": { "jupyter": { "source_hidden": false, "outputs_hidden": false }, "nteract": { "transient": { "deleting": false } } } }, { "cell_type": "code", "source": [ "spark.conf.set(\"spark.sql.execution.arrow.enabled\",\"true\")\r\n", "spark_final=spark.createDataFrame(df_final) \r\n", "spark_final.printSchema()\r\n", "spark_final.show()" ], "outputs": [], "execution_count": null, "metadata": { "jupyter": { "source_hidden": false, "outputs_hidden": false }, "nteract": { "transient": { "deleting": false } } } }, { "cell_type": "code", "source": [ "path = 'abfss://modsynapsefiles@modstorageforsynapse.dfs.core.windows.net/demodata/df_final/{0}'.format(datetime.now().strftime('%Y%m%d%H%M%S'))" ], "outputs": [], "execution_count": null, "metadata": { "jupyter": { "source_hidden": false, "outputs_hidden": false }, "nteract": { "transient": { "deleting": false } } } }, { "cell_type": "code", "source": [ "spark_final.write.parquet(path, mode='overwrite')" ], "outputs": [], "execution_count": null, "metadata": { "jupyter": { "source_hidden": false, "outputs_hidden": false }, "nteract": { "transient": { "deleting": false } } } }, { "cell_type": "code", "source": [ "spark.read.parquet(path).count()" ], "outputs": [], "execution_count": null, "metadata": { "jupyter": { "source_hidden": false, "outputs_hidden": false }, "nteract": { "transient": { "deleting": false } } } } ], "metadata": { "kernelspec": { "name": "synapse_pyspark", "display_name": "Synapse PySpark" }, "language_info": { "name": "python" }, "description": null, "save_output": true, "synapse_widget": { "version": "0.1", "state": {} } }, "nbformat": 4, "nbformat_minor": 2 }