scenario-notebooks/Hunting-Notebooks/Hunting-QueryParquetFilesAndIngestionToCustomTable.ipynb (441 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"source": [
"# Hunting - Query Parquet Files and MDTI API and Ingestion to Custom Table\n",
"\n",
"__Notebook Version:__ 1.0<br>\n",
"__Python Version:__ Python 3.8<br>\n",
"__Apache Spark Version:__ 3.1<br>\n",
"__Required Packages:__ azure-monitor-query, azure-mgmt-loganalytics<br>\n",
"__Platforms Supported:__ Azure Synapse Analytics\n",
" \n",
"__Data Source Required:__ Log Analytics custom table defined\n",
" \n",
"### Description\n",
"This notebook provides step-by-step instructions and sample code to query parquet data from Azure Data Lake Storage and then store it back to Log Analytocs pre-defined custom table.<br>\n",
"*** Please run the cells sequentially to avoid errors. Please do not use \"run all cells\". *** <br>\n",
"\n",
"## Table of Contents\n",
"1. Warm-up\n",
"2. ADLS Parquet Data Queries\n",
"3. Save result to Azure Log Analytics Custom Table"
],
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"## 1. Warm-up"
],
"metadata": {}
},
{
"cell_type": "code",
"source": [
"# Load Python libraries that will be used in this notebook\n",
"from azure.mgmt.loganalytics import LogAnalyticsManagementClient\n",
"from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus\n",
"from azure.monitor.ingestion import LogsIngestionClient\n",
"from azure.identity import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential\n",
"from azure.core.exceptions import HttpResponseError \n",
"\n",
"import functools\n",
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.types import *\n",
"\n",
"import sys\n",
"from datetime import datetime, timezone, timedelta\n",
"import requests\n",
"import pandas as pd\n",
"import numpy\n",
"import json\n",
"import math\n",
"import ipywidgets\n",
"from IPython.display import display, HTML, Markdown"
],
"outputs": [],
"execution_count": null,
"metadata": {}
},
{
"cell_type": "code",
"source": [
"# User input for Log Analytics workspace for data ingestion\r\n",
"tenant_id = \"\"\r\n",
"subscription_id = \"\"\r\n",
"workspace_id = \"\"\r\n",
"resource_group_name = \"\"\r\n",
"location = \"\"\r\n",
"workspace_name = \"\"\r\n",
"workspace_resource_id = \"/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}\".format(subscription_id, resource_group_name, workspace_name)\r\n",
"data_collection_endpoint_name = \"\"\r\n",
"data_collection_rule_name = \"\"\r\n",
"custom_table_name = \"\"\r\n",
"stream_name = \"Custom-\" + custom_table_name\r\n",
"immutable_rule_id = \"\"\r\n",
"dce_endpoint = \"\"\r\n",
"\r\n",
"akv_name = \"\"\r\n",
"client_id_name = \"\"\r\n",
"client_secret_name = \"\"\r\n",
"akv_link_name = \"\""
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Inputs for ADLS Parquet file path\r\n",
"stroage_account_name = \"\"\r\n",
"container_name = \"\"\r\n",
"folder_path = \"\"\r\n",
"lookback_hours = 8"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# You may need to change resource_uri for various cloud environments.\r\n",
"resource_uri = \"https://api.loganalytics.io\"\r\n",
"client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)\r\n",
"client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)\r\n",
"\r\n",
"credential = ClientSecretCredential(\r\n",
" tenant_id=tenant_id, \r\n",
" client_id=client_id, \r\n",
" client_secret=client_secret)\r\n",
"access_token = credential.get_token(resource_uri + \"/.default\")\r\n",
"token = access_token[0]"
],
"outputs": [],
"execution_count": null,
"metadata": {}
},
{
"cell_type": "markdown",
"source": [
"## 2. ADLS Data Queries"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"current_time = datetime.now()\r\n",
"lookback_time = datetime.now() - timedelta(hours = lookback_hours)\r\n",
"spark_session = SparkSession.builder.appName('Empty_Dataframe').getOrCreate()\r\n",
"\r\n",
"def unionAll(dfs):\r\n",
" return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)\r\n",
"\r\n",
"i = 0 \r\n",
"for file_info in list_file:\r\n",
" if file_info.isDir:\r\n",
" modified_time = datetime.fromtimestamp(file_info.modifyTime / 1e3)\r\n",
" if modified_time >= lookback_time and modified_time < datetime.now():\r\n",
" print(file_info.name)\r\n",
" path = 'abfss://{0}@{1}.dfs.core.windows.net/{2}/{3}'.format(container_name, stroage_account_name, folder_path, file_info.name)\r\n",
" print(path)\r\n",
" df_parquet = spark.read.parquet(path)\r\n",
" print(df_parquet.count())\r\n",
" if i == 0:\r\n",
" df_spark = df_parquet\r\n",
" i = i + 1\r\n",
" else: \r\n",
" df_spark = unionAll([df_spark, df_parquet])\r\n",
" \r\n",
" "
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"df_final = df_spark.toPandas()\r\n",
"df_final.shape[0]"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"### Service Data: MDTI API"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Calling Microsoft MDTI API for List, the same template can be used for calling other Azure REST APIs with different parameters.\r\n",
"# For different environments, such as national clouds, you may need to use different root_url, please contact with your admins.\r\n",
"# It can be ---.azure.us, ---.azure.microsoft.scloud, ---.azure.eaglex.ic.gov, etc.\r\n",
"def call_mdti_api_for_read(token, resource):\r\n",
" \"Calling Microsoft MDTI API\"\r\n",
" headers = {\"Authorization\": token, \"content-type\":\"application/json\" }\r\n",
" root_url = \"https://graph.microsoft.com\"\r\n",
" mdti_url_template = \"{0}/beta/security/threatIntelligence/{1}\"\r\n",
" mdti_url = mdti_url_template.format(root_url, resource)\r\n",
" # print(mdti_url)\r\n",
" try:\r\n",
" response = requests.get(mdti_url, headers=headers, verify=True)\r\n",
" return response\r\n",
" except HttpResponseError as e:\r\n",
" print(f\"Calling MDTI API failed: {e}\")\r\n",
" return None\r\n",
"\r\n",
"def get_token_for_graph():\r\n",
" resource_uri = \"https://graph.microsoft.com\"\r\n",
" client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)\r\n",
" client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)\r\n",
"\r\n",
" credential = ClientSecretCredential(\r\n",
" tenant_id=tenant_id, \r\n",
" client_id=client_id, \r\n",
" client_secret=client_secret)\r\n",
" access_token = credential.get_token(resource_uri + \"/.default\")\r\n",
" return access_token[0]"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Calling MDTI API, hosts as example\r\n",
"header_token_value = \"Bearer {}\".format(get_token_for_graph())\r\n",
"response_mdti_host = call_mdti_api_for_read(header_token_value, \"hosts('www.microsoft.com')\")"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"df_final.loc[df_final['ip'].str.startswith('23.'), 'Fact'] = response_mdti_host.json()[\"registrar\"]"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"df_merged = df_final.rename(columns = {'TimeGenerated': 'TimeGenerated', 'ip': 'IP', 'Fact': 'Fact'})[['TimeGenerated', 'IP', 'Fact']]"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "markdown",
"source": [
"## 3. Save result to Azure Log Analytics Custom Table"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# function for data converting\r\n",
"def convert_dataframe_to_list_of_dictionaries(df, hasTimeGeneratedColumn):\r\n",
" list = df.to_dict('records')\r\n",
"\r\n",
" for row in list:\r\n",
" # The dataframe may have more than one datetime columns, add all datetiome columns inside this loop, to render ISO 8601\r\n",
" if hasTimeGeneratedColumn and row['TimeGenerated'] != None:\r\n",
" row['TimeGenerated']= row['TimeGenerated'].strftime(\"%Y-%m-%dT%H:%M:%S.%fZ\")\r\n",
" \r\n",
" return list\r\n",
"\r\n",
"def check_dataframe_size_in_mb(df, size_limit_in_mb=25):\r\n",
" \"Check if dataframe has more than 25 MB data, 30 MB is the limit for POST\"\r\n",
" size_in_mb = sys.getsizeof(df) / 1000000\r\n",
" return size_in_mb / size_limit_in_mb\r\n",
"\r\n",
"def partition_dataframe_for_data_infestion(df):\r\n",
" df_size = check_dataframe_size_in_mb(df)\r\n",
" if df_size > 1:\r\n",
" partition_number = math.ceil(df_size)\r\n",
" index_block = len(df) // partition_number\r\n",
"\r\n",
" list_df = [df[i:i+index_block] for i in range(0,df.shape[0],index_block)]\r\n",
" return list_df\r\n",
" else:\r\n",
" return [df]"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"# Data ingestion to LA custom table\r\n",
"client = LogsIngestionClient(endpoint=dce_endpoint, credential=credential, logging_enable=True)\r\n",
"\r\n",
"try:\r\n",
" ind = 0\r\n",
" list_df = partition_dataframe_for_data_infestion(df_merged)\r\n",
" for df in list_df:\r\n",
" body = convert_dataframe_to_list_of_dictionaries(df, True)\r\n",
" print(ind)\r\n",
" print(df.shape[0])\r\n",
" ingestion_result = client.upload(rule_id=immutable_rule_id, stream_name=stream_name, logs=body)\r\n",
" ind = ind + 1\r\n",
"except HttpResponseError as e:\r\n",
" print(f\"Data ingestion failed: {e}\")"
],
"outputs": [],
"execution_count": null,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
}
}
}
],
"metadata": {
"kernelspec": {
"name": "synapse_pyspark",
"display_name": "Synapse PySpark"
},
"language_info": {
"name": "python"
},
"description": null,
"save_output": true,
"synapse_widget": {
"version": "0.1",
"state": {}
}
},
"nbformat": 4,
"nbformat_minor": 2
}