scenario-notebooks/Hunting-Notebooks/Scheduled_Hunting-AutomatedDataQueryAndMDTIAPIAndIngestionToCustomTable.ipynb (499 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"source": [
"# Scheduled: Hunting - Automated Data Query and MDTI API and Ingestion to Custom Table\n",
"\n",
"__Notebook Version:__ 1.0<br>\n",
"__Python Version:__ Python 3.8<br>\n",
"__Apache Spark Version:__ 3.1<br>\n",
"__Required Packages:__ azure-monitor-query, azure-mgmt-loganalytics<br>\n",
"__Platforms Supported:__ Azure Synapse Analytics\n",
" \n",
"__Data Source Required:__ Log Analytics custom table defined\n",
" \n",
"### Description\n",
"This notebook provides step-by-step instructions and sample code to query various data from Azure Log Analytics and then store it back to Log Analytocs pre-defined custom table.<br>\n",
"*** Please run the cells sequentially to avoid errors. Please do not use \"run all cells\". *** <br>\n",
"Need to know more about KQL? [Getting started with Kusto Query Language](https://docs.microsoft.com/azure/data-explorer/kusto/concepts/).\n",
"\n",
"## Table of Contents\n",
"1. Warm-up\n",
"2. Azure Log Analytics Data Queries\n",
"3. Save result to Azure Log Analytics Custom Table"
],
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"## 1. Warm-up"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"# Load Python libraries that will be used in this notebook\n",
"from azure.mgmt.loganalytics import LogAnalyticsManagementClient\n",
"from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus\n",
"from azure.monitor.ingestion import LogsIngestionClient\n",
"from azure.identity import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential\n",
"from azure.core.exceptions import HttpResponseError \n",
"\n",
"import sys\n",
"from datetime import datetime, timezone, timedelta\n",
"import requests\n",
"import pandas as pd\n",
"import numpy\n",
"import json\n",
"import math\n",
"import ipywidgets\n",
"from IPython.display import display, HTML, Markdown"
],
"outputs": [],
"execution_count": null,
"metadata": {}
},
{
"cell_type": "code",
"source": [
"# User input for Log Analytics workspace as the data source for querying\r\n",
"subscription_id_source = \"\"\r\n",
"resource_group_name_source = \"\"\r\n",
"workspace_name_source = \"\"\r\n",
"workspace_id_source = \"\"\r\n",
"workspace_resource_id_source = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id_source, resource_group_name_source, workspace_name_source)\r\n"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# User input for Log Analytics workspace for data ingestion\r\n",
"resource_group_name = \"\"\r\n",
"location = \"\"\r\n",
"workspace_name = \"\"\r\n",
"tenant_id = \"\"\r\n",
"subscription_id = \"\"\r\n",
"workspace_id = \"\"\r\n",
"workspace_resource_id = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id, resource_group_name, workspace_name)\r\n",
"data_collection_endpoint_name = \"\"\r\n",
"data_collection_rule_name = \"\"\r\n",
"custom_table_name = \"\"\r\n",
"stream_name = \"Custom-\" + custom_table_name\r\n",
"immutable_rule_id = \"\"\r\n",
"dce_endpoint = \"\"\r\n",
"\r\n",
"akv_name = \"\"\r\n",
"client_id_name = \"\"\r\n",
"client_secret_name = \"\"\r\n",
"akv_link_name = \"\""
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# You may need to change resource_uri for various cloud environments.\r\n",
"resource_uri = \"https://api.loganalytics.io\"\r\n",
"client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)\r\n",
"client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)\r\n",
"\r\n",
"credential = ClientSecretCredential(\r\n",
" tenant_id=tenant_id, \r\n",
" client_id=client_id, \r\n",
" client_secret=client_secret)\r\n",
"access_token = credential.get_token(resource_uri + \"/.default\")\r\n",
"token = access_token[0]"
],
"outputs": [],
"execution_count": null,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"## 2. Azure Log Analytics Data Queries"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Functions for query\r\n",
"def query_la(workspace_id_query, query):\r\n",
" la_data_client = LogsQueryClient(credential=credential)\r\n",
" end_time = datetime.now(timezone.utc)\r\n",
" start_time = end_time - timedelta(15)\r\n",
"\r\n",
" query_result = la_data_client.query_workspace(\r\n",
" workspace_id=workspace_id_query,\r\n",
" query=query,\r\n",
" timespan=(start_time, end_time))\r\n",
" \r\n",
" df_la_query = pd.DataFrame\r\n",
"\r\n",
" if query_result.status == LogsQueryStatus.SUCCESS:\r\n",
" if hasattr(query_result, 'tables'):\r\n",
" data = query_result.tables\r\n",
" if len(query_result.tables) > 1:\r\n",
" print('You have more than one tyable to processs')\r\n",
" elif query_result.status == LogsQueryStatus.PARTIAL:\r\n",
" data=query_result.partial_data\r\n",
" print(query_result.partial_error)\r\n",
" else:\r\n",
" print(query_result.error)\r\n",
" \r\n",
" if len(query_result.tables) > 1:\r\n",
" print('You have more than one tyable to processs')\r\n",
" for table in data:\r\n",
" df_la_query = pd.DataFrame(data=table.rows, columns=table.columns)\r\n",
" return df_la_query\r\n",
"\r\n",
"def slice_query_la(query, lookback_start, lookback_end='0', lookback_unit='h', query_row_limit=400000, split_factor=2):\r\n",
" \"Slice the time to render records <= 500K\"\r\n",
" count_query = query.format(lookback_start, lookback_unit, lookback_end)\r\n",
" count = ' | summarize count()'\r\n",
" count_query = count_query + count\r\n",
" df_count = query_la(workspace_id_source, count_query)\r\n",
" row_count = df_count['count_'][0]\r\n",
" print(count_query)\r\n",
" print(row_count)\r\n",
" df_final = pd.DataFrame()\r\n",
"\r\n",
" if row_count > query_row_limit:\r\n",
" number_of_divide = 0\r\n",
" while row_count > query_row_limit:\r\n",
" row_count = row_count / split_factor\r\n",
" number_of_divide = number_of_divide + 1\r\n",
"\r\n",
" factor = split_factor ** number_of_divide\r\n",
" step_number = math.ceil(int(lookback_start) / factor)\r\n",
" if factor > int(lookback_start) and lookback_unit == 'h':\r\n",
" lookback_unit = 'm'\r\n",
" number_of_minutes = 60\r\n",
" step_number = math.ceil(int(lookback_start)*number_of_minutes / factor)\r\n",
"\r\n",
" try:\r\n",
" for i in range(int(lookback_end), factor + 1, 1):\r\n",
" if i > 0:\r\n",
" df_la_query = pd.DataFrame\r\n",
" current_query = query.format(i * step_number, lookback_unit, (i - 1) * step_number)\r\n",
" print(current_query)\r\n",
" df_la_query = query_la(workspace_id_source, current_query)\r\n",
" print(df_la_query.shape[0])\r\n",
" df_final = pd.concat([df_final, df_la_query])\r\n",
" except:\r\n",
" print(\"query failed\")\r\n",
" raise\r\n",
" else:\r\n",
" df_final = query_la(workspace_id_source, query.format(lookback_start, lookback_unit, lookback_end))\r\n",
"\r\n",
" return df_final"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"### Slice data for query"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Use test LA table\r\n",
"query_template = \"let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}{1}) and TimeGenerated <= ago({2}{1}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip\"\r\n",
"lookback_start = '24'\r\n",
"\r\n",
"df_final = slice_query_la(query_template, lookback_start)\r\n",
"print(df_final.shape[0])"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"### Service Data: MDTI API"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Call Microsoft MDTI API for List, the same template can be used for calling other Azure REST APIs with different parameters.\r\n",
"# For different environments, such as national clouds, you may need to use different root_url, please contact with your admins.\r\n",
"# It can be ---.azure.us, ---.azure.microsoft.scloud, ---.azure.eaglex.ic.gov, etc.\r\n",
"def call_mdti_api_for_read(token, resource):\r\n",
" \"Calling Microsoft MDTI API\"\r\n",
" headers = {\"Authorization\": token, \"content-type\":\"application/json\" }\r\n",
" root_url = \"https://graph.microsoft.com\"\r\n",
" mdti_url_template = \"{0}/beta/security/threatIntelligence/{1}\"\r\n",
" mdti_url = mdti_url_template.format(root_url, resource)\r\n",
" # print(mdti_url)\r\n",
" response = requests.get(mdti_url, headers=headers, verify=True)\r\n",
" return response\r\n",
"\r\n",
"def get_token_for_graph():\r\n",
" resource_uri = \"https://graph.microsoft.com\"\r\n",
" client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)\r\n",
" client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)\r\n",
"\r\n",
" credential = ClientSecretCredential(\r\n",
" tenant_id=tenant_id, \r\n",
" client_id=client_id, \r\n",
" client_secret=client_secret)\r\n",
" access_token = credential.get_token(resource_uri + \"/.default\")\r\n",
" return access_token[0]"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Call MDTI API, hosts as example\r\n",
"header_token_value = \"Bearer {}\".format(get_token_for_graph())\r\n",
"response_mdti_host = call_mdti_api_for_read(header_token_value, \"hosts('www.microsoft.com')\")"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Merge data\r\n",
"df_final.loc[df_final['ip'].str.startswith('23.'), 'Fact'] = response_mdti_host.json()[\"registrar\"]\r\n",
"df_merged = df_final.rename(columns = {'TimeGenerated': 'TimeGenerated', 'ip': 'IP', 'Fact': 'Fact'})[['TimeGenerated', 'IP', 'Fact']]"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"#df_merged"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"## 3. Save result to Azure Log Analytics Custom Table"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# function for data converting\r\n",
"def convert_dataframe_to_list_of_dictionaries(df, hasTimeGeneratedColumn):\r\n",
" list = df.to_dict('records')\r\n",
"\r\n",
" for row in list:\r\n",
" # The dataframe may have more than one datetime columns, add all datetiome columns inside this loop, to render ISO 8601\r\n",
" if hasTimeGeneratedColumn and row['TimeGenerated'] != None:\r\n",
" row['TimeGenerated']= row['TimeGenerated'].strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\")\r\n",
" \r\n",
" return list\r\n",
"\r\n",
"def check_dataframe_size_in_mb(df, size_limit_in_mb=25):\r\n",
" \"Check if dataframe has more than 25 MB data, 30 MB is the limit for POST\"\r\n",
" size_in_mb = sys.getsizeof(df) / 1000000\r\n",
" return size_in_mb / size_limit_in_mb\r\n",
"\r\n",
"def partition_dataframe_for_data_infestion(df):\r\n",
" df_size = check_dataframe_size_in_mb(df)\r\n",
" if df_size > 1:\r\n",
" partition_number = math.ceil(df_size)\r\n",
" index_block = len(df) // partition_number\r\n",
"\r\n",
" list_df = [df[i:i+index_block] for i in range(0,df.shape[0],index_block)]\r\n",
" return list_df\r\n",
" else:\r\n",
" return [df]"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Data ingestion to LA custom table\r\n",
"client = LogsIngestionClient(endpoint=dce_endpoint, credential=credential, logging_enable=True)\r\n",
"\r\n",
"try:\r\n",
" ind = 0\r\n",
" list_df = partition_dataframe_for_data_infestion(df_merged)\r\n",
" for df in list_df:\r\n",
" body = convert_dataframe_to_list_of_dictionaries(df, True)\r\n",
" print(df.shape[0])\r\n",
" ingestion_result = client.upload(rule_id=immutable_rule_id, stream_name=stream_name, logs=body)\r\n",
" ind = ind + 1\r\n",
"except HttpResponseError as e:\r\n",
" print(f\"Data ingestion failed: {e}\")"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
}
],
"metadata": {
"kernelspec": {
"name": "synapse_pyspark",
"display_name": "Synapse PySpark"
},
"language_info": {
"name": "python"
},
"description": null,
"save_output": true,
"synapse_widget": {
"version": "0.1",
"state": {}
}
},
"nbformat": 4,
"nbformat_minor": 2
}