# Hunting - Query Parquet Files and MDTI API and Ingestion to Custom Table

__Notebook Version:__ 1.0<br>
__Python Version:__ Python 3.8<br>
__Apache Spark Version:__ 3.1<br>
__Required Packages:__ azure-monitor-query, azure-mgmt-loganalytics<br>
__Platforms Supported:__  Azure Synapse Analytics
     
__Data Source Required:__ Log Analytics custom table defined
    
### Description
This notebook provides step-by-step instructions and sample code to query parquet data from Azure Data Lake Storage and then store it back to Log Analytocs pre-defined custom table.<br>
*** Please run the cells sequentially to avoid errors.  Please do not use "run all cells". *** <br>

## Table of Contents
1. Warm-up
2. ADLS Parquet Data Queries
3. Save result to Azure Log Analytics Custom Table

## 1. Warm-up

In [None]:
# Load Python libraries that will be used in this notebook
from azure.mgmt.loganalytics import LogAnalyticsManagementClient
from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus
from azure.monitor.ingestion import LogsIngestionClient
from azure.identity import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential
from azure.core.exceptions import  HttpResponseError 

import functools
from pyspark.sql import SparkSession
from pyspark.sql.types import *

import sys
from datetime import datetime, timezone, timedelta
import requests
import pandas as pd
import numpy
import json
import math
import ipywidgets
from IPython.display import display, HTML, Markdown

In [None]:
# User input for Log Analytics workspace for data ingestion
tenant_id = ""
subscription_id = ""
workspace_id = ""
resource_group_name = ""
location = ""
workspace_name = ""
workspace_resource_id = "/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}".format(subscription_id, resource_group_name, workspace_name)
data_collection_endpoint_name = ""
data_collection_rule_name = ""
custom_table_name = ""
stream_name = "Custom-" + custom_table_name
immutable_rule_id = ""
dce_endpoint = ""

akv_name = ""
client_id_name = ""
client_secret_name = ""
akv_link_name = ""

In [None]:
# Inputs for ADLS Parquet file path
stroage_account_name = ""
container_name = ""
folder_path = ""
lookback_hours = 8

In [None]:
# You may need to change resource_uri for various cloud environments.
resource_uri = "https://api.loganalytics.io"
client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)
client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)

credential = ClientSecretCredential(
    tenant_id=tenant_id, 
    client_id=client_id, 
    client_secret=client_secret)
access_token = credential.get_token(resource_uri + "/.default")
token = access_token[0]

## 2. ADLS Data Queries

In [None]:
current_time = datetime.now()
lookback_time = datetime.now() - timedelta(hours = lookback_hours)
spark_session = SparkSession.builder.appName('Empty_Dataframe').getOrCreate()

def unionAll(dfs):
    return functools.reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs)

i = 0  
for file_info in list_file:
    if file_info.isDir:
        modified_time = datetime.fromtimestamp(file_info.modifyTime / 1e3)
        if modified_time >= lookback_time and modified_time < datetime.now():
            print(file_info.name)
            path = 'abfss://{0}@{1}.dfs.core.windows.net/{2}/{3}'.format(container_name, stroage_account_name, folder_path, file_info.name)
            print(path)
            df_parquet = spark.read.parquet(path)
            print(df_parquet.count())
            if i == 0:
                df_spark = df_parquet
                i = i + 1
            else: 
                df_spark = unionAll([df_spark, df_parquet])
            
            

In [None]:
df_final = df_spark.toPandas()
df_final.shape[0]

### Service Data: MDTI API

In [None]:
# Calling Microsoft MDTI API for List, the same template can be used for calling other Azure REST APIs with different parameters.
# For different environments, such as national clouds, you may need to use different root_url, please contact with your admins.
# It can be ---.azure.us, ---.azure.microsoft.scloud, ---.azure.eaglex.ic.gov, etc.
def call_mdti_api_for_read(token, resource):
    "Calling Microsoft MDTI API"
    headers = {"Authorization": token, "content-type":"application/json" }
    root_url = "https://graph.microsoft.com"
    mdti_url_template = "{0}/beta/security/threatIntelligence/{1}"
    mdti_url = mdti_url_template.format(root_url, resource)
    # print(mdti_url)
    try:
        response = requests.get(mdti_url, headers=headers, verify=True)
        return response
    except HttpResponseError as e:
        print(f"Calling MDTI API failed: {e}")
        return None

def get_token_for_graph():
    resource_uri = "https://graph.microsoft.com"
    client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)
    client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)

    credential = ClientSecretCredential(
        tenant_id=tenant_id, 
        client_id=client_id, 
        client_secret=client_secret)
    access_token = credential.get_token(resource_uri + "/.default")
    return access_token[0]

In [None]:
# Calling MDTI API, hosts as example
header_token_value = "Bearer {}".format(get_token_for_graph())
response_mdti_host = call_mdti_api_for_read(header_token_value, "hosts('www.microsoft.com')")

In [None]:
df_final.loc[df_final['ip'].str.startswith('23.'), 'Fact'] = response_mdti_host.json()["registrar"]

In [None]:
df_merged = df_final.rename(columns = {'TimeGenerated': 'TimeGenerated', 'ip': 'IP', 'Fact': 'Fact'})[['TimeGenerated', 'IP', 'Fact']]

## 3. Save result to Azure Log Analytics Custom Table

In [None]:
# function for data converting
def convert_dataframe_to_list_of_dictionaries(df, hasTimeGeneratedColumn):
    list = df.to_dict('records')

    for row in list:
        # The dataframe may have more than one datetime columns, add all datetiome columns inside this loop, to render ISO 8601
        if hasTimeGeneratedColumn and row['TimeGenerated'] != None:
            row['TimeGenerated']= row['TimeGenerated'].strftime("%Y-%m-%dT%H:%M:%S.%fZ")
    
    return list

def check_dataframe_size_in_mb(df, size_limit_in_mb=25):
    "Check if dataframe has more than 25 MB data, 30 MB is the limit for POST"
    size_in_mb = sys.getsizeof(df) / 1000000
    return size_in_mb / size_limit_in_mb

def partition_dataframe_for_data_infestion(df):
    df_size = check_dataframe_size_in_mb(df)
    if df_size > 1:
        partition_number = math.ceil(df_size)
        index_block = len(df) // partition_number

        list_df = [df[i:i+index_block] for i in range(0,df.shape[0],index_block)]
        return list_df
    else:
        return [df]

In [None]:
# Data ingestion to LA custom table
client = LogsIngestionClient(endpoint=dce_endpoint, credential=credential, logging_enable=True)

try:
    ind = 0
    list_df = partition_dataframe_for_data_infestion(df_merged)
    for df in list_df:
        body = convert_dataframe_to_list_of_dictionaries(df, True)
        print(ind)
        print(df.shape[0])
        ingestion_result = client.upload(rule_id=immutable_rule_id, stream_name=stream_name, logs=body)
        ind = ind + 1
except HttpResponseError as e:
    print(f"Data ingestion failed: {e}")