scenario-notebooks/Hunting-Notebooks/AIO_Hunting-AutomatedDataQueryAndIngestionToCustomTable.ipynb (512 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# AIO: Hunting - Automated Data Query and MDTI API and Ingestion to Custom Table\n",
"\n",
"__Notebook Version:__ 1.0<br>\n",
"__Python Version:__ Python 3.8<br>\n",
"__Apache Spark Version:__ 3.1<br>\n",
"__Required Packages:__ azure-monitor-query, azure-mgmt-loganalytics<br>\n",
"__Platforms Supported:__ Azure Synapse Analytics\n",
" \n",
"__Data Source Required:__ Log Analytics custom table defined\n",
" \n",
"### Description\n",
"This notebook provides step-by-step instructions and sample code to query various data from Azure Log Analytics and then store it back to Log Analytics pre-defined custom table while using asyncio functions for concurrency.<br>\n",
"*** Please run the cells sequentially to avoid errors. Please do not use \"run all cells\". *** <br>\n",
"Need to know more about KQL? [Getting started with Kusto Query Language](https://docs.microsoft.com/azure/data-explorer/kusto/concepts/).\n",
"\n",
"## Table of Contents\n",
"1. Warm-up\n",
"2. Azure Log Analytics Data Queries\n",
"3. Save result to Azure Log Analytics Custom Table"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Warm-up"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Load Python libraries that will be used in this notebook\n",
"from azure.monitor.query import LogsQueryStatus\n",
"from azure.monitor.query.aio import LogsQueryClient, MetricsQueryClient\n",
"from azure.monitor.ingestion.aio import LogsIngestionClient\n",
"\n",
"from azure.identity.aio import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential\n",
"from azure.core.exceptions import HttpResponseError\n",
"\n",
"import sys\n",
"from datetime import datetime, timezone, timedelta\n",
"import requests\n",
"import pandas as pd\n",
"import numpy\n",
"import json\n",
"import math\n",
"import ipywidgets\n",
"from IPython.display import display, HTML, Markdown\n",
"\n",
"import asyncio\n",
"\n",
"# Optionally use aiometer for granular throttling\n",
"# import functools\n",
"# !pip install aiometer\n",
"# import aiometer"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Azure KeyVault details\n",
"tenant_id = \"\"\n",
"akv_name = \"\"\n",
"client_id_name = \"\"\n",
"client_secret_name = \"\"\n",
"akv_link_name = \"\"\n",
"\n",
"# Get credential for Azure\n",
"def get_credential():\n",
" client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)\n",
" client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)\n",
"\n",
" return ClientSecretCredential(\n",
" tenant_id=tenant_id,\n",
" client_id=client_id,\n",
" client_secret=client_secret)"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## 2. Azure Log Analytics Data Queries"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"# User input for Log Analytics workspace as the data source for querying\n",
"workspace_id_source = \"\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"async def throttle_gather(tasks, concurrent=5):\n",
" async def blocker(index, task, sem):\n",
" try:\n",
" return await task\n",
" finally:\n",
" sem.release()\n",
"\n",
" semaphore = asyncio.BoundedSemaphore(concurrent)\n",
" results = []\n",
" for i, t in enumerate(tasks):\n",
" await semaphore.acquire()\n",
" results.append(asyncio.create_task(blocker(i, t, semaphore)))\n",
"\n",
" return await asyncio.gather(*results)\n",
"\n",
"# Functions for query\n",
"async def query_la(la_data_client, workspace_id_query, query):\n",
" print(query)\n",
" end_time = datetime.now(timezone.utc)\n",
" start_time = end_time - timedelta(15)\n",
"\n",
" query_result = await la_data_client.query_workspace(\n",
" workspace_id=workspace_id_query,\n",
" query=query,\n",
" timespan=(start_time, end_time))\n",
" \n",
" df_la_query = pd.DataFrame\n",
"\n",
" if query_result.status == LogsQueryStatus.SUCCESS:\n",
" if hasattr(query_result, 'tables'):\n",
" data = query_result.tables\n",
" if len(query_result.tables) > 1:\n",
" print('You have more than one table to processs')\n",
" elif query_result.status == LogsQueryStatus.PARTIAL:\n",
" data=query_result.partial_data\n",
" print(query_result.partial_error)\n",
" else:\n",
" print(query_result.error)\n",
" \n",
" if len(query_result.tables) > 1:\n",
" print('You have more than one table to processs')\n",
" for table in data:\n",
" df_la_query = pd.DataFrame(data=table.rows, columns=table.columns)\n",
" return df_la_query\n",
"\n",
"async def slice_query_la(query, lookback_start, lookback_end='0', lookback_unit='h', query_row_limit=400000, split_factor=2):\n",
" \"Slice the time to render records <= 500K\"\n",
" async with get_credential() as credential, LogsQueryClient(credential=credential) as la_data_client:\n",
" count_query = query.format(lookback_start, lookback_unit, lookback_end)\n",
" count = ' | count'\n",
" count_query = count_query + count\n",
" df_count = await query_la(la_data_client, workspace_id_source, count_query)\n",
" row_count = df_count['Count'][0]\n",
" print(row_count)\n",
"\n",
" if row_count > query_row_limit:\n",
" number_of_divide = 0\n",
" while row_count > query_row_limit:\n",
" row_count = row_count / split_factor\n",
" number_of_divide = number_of_divide + 1\n",
"\n",
" factor = split_factor ** number_of_divide\n",
" step_number = math.ceil(int(lookback_start) / factor)\n",
" if factor > int(lookback_start) and lookback_unit == 'h':\n",
" lookback_unit = 'm'\n",
" number_of_minutes = 60\n",
" step_number = math.ceil(int(lookback_start)*number_of_minutes / factor)\n",
"\n",
" try:\n",
" # return pd.concat(await aiometer.run_all(\n",
" # (functools.partial(query_la, la_data_client, workspace_id_source, query.format(i * step_number, lookback_unit, (i - 1) * step_number))\n",
" # for i in range(int(lookback_end), factor + 1)\n",
" # if i > 0),\n",
" # max_at_once=5,\n",
" # max_per_second=6))\n",
" return pd.concat(await throttle_gather(\n",
" query_la(la_data_client, workspace_id_source, query.format(i * step_number, lookback_unit, (i - 1) * step_number))\n",
" for i in range(int(lookback_end), factor + 1)\n",
" if i > 0))\n",
" except:\n",
" print(\"query failed\")\n",
" raise\n",
" else:\n",
" return await query_la(la_data_client, workspace_id_source, query.format(lookback_start, lookback_unit, lookback_end))"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### Slice data for query"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# Use Dror's test LA table\n",
"query_template = \"let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}{1}) and TimeGenerated <= ago({2}{1}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip\"\n",
"lookback_start = '24'\n",
"\n",
"df_final = await slice_query_la(query_template, lookback_start)\n",
"print(df_final.shape[0])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"df_final"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"### Service Data: MDTI API"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# Calling Microsoft MDTI API for List, the same template can be used for calling other Azure REST APIs with different parameters.\n",
"# For different environments, such as national clouds, you may need to use different root_url, please contact with your admins.\n",
"# It can be ---.azure.us, ---.azure.microsoft.scloud, ---.azure.eaglex.ic.gov, etc.\n",
"def call_mdti_api_for_read(token, resource):\n",
" \"Calling Microsoft MDTI API\"\n",
" headers = {\"Authorization\": token, \"content-type\":\"application/json\" }\n",
" root_url = \"https://graph.microsoft.com\"\n",
" mdti_url_template = \"{0}/beta/security/threatIntelligence/{1}\"\n",
" mdti_url = mdti_url_template.format(root_url, resource)\n",
" # print(mdti_url)\n",
" try:\n",
" response = requests.get(mdti_url, headers=headers, verify=True)\n",
" return response\n",
" except HttpResponseError as e:\n",
" print(f\"Calling MDTI API failed: {e}\")\n",
" return None\n",
"\n",
"async def get_token_for_graph():\n",
" resource_uri = \"https://graph.microsoft.com\"\n",
" async with get_credential() as credential:\n",
" access_token = await credential.get_token(resource_uri + \"/.default\")\n",
" return access_token[0]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# Calling MDTI API, hosts as example\n",
"header_token_value = \"Bearer {}\".format(await get_token_for_graph())\n",
"response_mdti_host = call_mdti_api_for_read(header_token_value, \"hosts('www.microsoft.com')\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# Merge data\n",
"df_final.loc[df_final['ip'].str.startswith('23.'), 'Fact'] = response_mdti_host.json()[\"registrar\"]\n",
"df_merged = df_final.rename(columns = {'TimeGenerated': 'TimeGenerated', 'ip': 'IP', 'Fact': 'Fact'})[['TimeGenerated', 'IP', 'Fact']]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# df_merged"
]
},
{
"cell_type": "markdown",
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
},
"source": [
"## 3. Save result to Azure Log Analytics Custom Table"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# User input for Log Analytics workspace for data ingestion\n",
"custom_table_name = \"\"\n",
"stream_name = \"Custom-\" + custom_table_name\n",
"immutable_rule_id = \"\"\n",
"dce_endpoint = \"\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# function for data converting\n",
"def convert_dataframe_to_list_of_dictionaries(df, hasTimeGeneratedColumn):\n",
" listd = df.to_dict('records')\n",
" for row in listd:\n",
" # The dataframe may have more than one datetime columns, add all datetiome columns inside this loop, to render ISO 8601\n",
" if hasTimeGeneratedColumn and row['TimeGenerated'] != None:\n",
" row['TimeGenerated']= row['TimeGenerated'].strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\")\n",
" return listd\n",
"\n",
"def check_dataframe_size_in_mb(df, size_limit_in_mb=25):\n",
" \"Check if dataframe has more than 25 MB data, 30 MB is the limit for POST\"\n",
" size_in_mb = sys.getsizeof(df) / 1000000\n",
" return size_in_mb / size_limit_in_mb\n",
"\n",
"def partition_dataframe_for_data_infestion(df):\n",
" df_size = check_dataframe_size_in_mb(df)\n",
" if df_size > 1:\n",
" partition_number = math.ceil(df_size)\n",
" index_block = len(df) // partition_number\n",
" for i in range(0,df.shape[0],index_block):\n",
" yield df[i:i+index_block]\n",
" else:\n",
" yield df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"jupyter": {
"outputs_hidden": false,
"source_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
},
"outputs": [],
"source": [
"# Data ingestion to LA custom table\n",
"async with get_credential() as credential, LogsIngestionClient(endpoint=dce_endpoint, credential=credential, logging_enable=True) as client:\n",
" try:\n",
" for ind, df in enumerate(partition_dataframe_for_data_infestion(df_merged)):\n",
" body = convert_dataframe_to_list_of_dictionaries(df, True)\n",
" print(f\"{ind}: {df.shape[0]}\")\n",
" ingestion_result = await client.upload(rule_id=immutable_rule_id, stream_name=stream_name, logs=body)\n",
" except HttpResponseError as e:\n",
" print(f\"Data ingestion failed: {e}\")"
]
}
],
"metadata": {
"description": null,
"kernelspec": {
"display_name": "Synapse PySpark",
"name": "synapse_pyspark"
},
"language_info": {
"name": "python"
},
"save_output": true,
"synapse_widget": {
"state": {},
"version": "0.1"
}
},
"nbformat": 4,
"nbformat_minor": 2
}