# AIO: Hunting - Automated Data Query and MDTI API and Ingestion to Custom Table

__Notebook Version:__ 1.0<br>
__Python Version:__ Python 3.8<br>
__Apache Spark Version:__ 3.1<br>
__Required Packages:__ azure-monitor-query, azure-mgmt-loganalytics<br>
__Platforms Supported:__  Azure Synapse Analytics
     
__Data Source Required:__ Log Analytics custom table defined
    
### Description
This notebook provides step-by-step instructions and sample code to query various data from Azure Log Analytics and then store it back to Log Analytics pre-defined custom table while using asyncio functions for concurrency.<br>
*** Please run the cells sequentially to avoid errors.  Please do not use "run all cells". *** <br>
Need to know more about KQL? [Getting started with Kusto Query Language](https://docs.microsoft.com/azure/data-explorer/kusto/concepts/).

## Table of Contents
1. Warm-up
2. Azure Log Analytics Data Queries
3. Save result to Azure Log Analytics Custom Table

## 1. Warm-up

In [None]:
# Load Python libraries that will be used in this notebook
from azure.monitor.query import LogsQueryStatus
from azure.monitor.query.aio import LogsQueryClient, MetricsQueryClient
from azure.monitor.ingestion.aio import LogsIngestionClient

from azure.identity.aio import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential
from azure.core.exceptions import HttpResponseError

import sys
from datetime import datetime, timezone, timedelta
import requests
import pandas as pd
import numpy
import json
import math
import ipywidgets
from IPython.display import display, HTML, Markdown

import asyncio

# Optionally use aiometer for granular throttling
# import functools
# !pip install aiometer
# import aiometer

In [None]:
# Azure KeyVault details
tenant_id = ""
akv_name = ""
client_id_name = ""
client_secret_name = ""
akv_link_name = ""

# Get credential for Azure
def get_credential():
    client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)
    client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)

    return ClientSecretCredential(
        tenant_id=tenant_id,
        client_id=client_id,
        client_secret=client_secret)

## 2. Azure Log Analytics Data Queries

In [None]:
# User input for Log Analytics workspace as the data source for querying
workspace_id_source = ""

In [None]:
async def throttle_gather(tasks, concurrent=5):
    async def blocker(index, task, sem):
        try:
            return await task
        finally:
            sem.release()

    semaphore = asyncio.BoundedSemaphore(concurrent)
    results = []
    for i, t in enumerate(tasks):
        await semaphore.acquire()
        results.append(asyncio.create_task(blocker(i, t, semaphore)))

    return await asyncio.gather(*results)

# Functions for query
async def query_la(la_data_client, workspace_id_query, query):
    print(query)
    end_time =  datetime.now(timezone.utc)
    start_time = end_time - timedelta(15)

    query_result = await la_data_client.query_workspace(
        workspace_id=workspace_id_query,
        query=query,
        timespan=(start_time, end_time))
    
    df_la_query = pd.DataFrame

    if query_result.status == LogsQueryStatus.SUCCESS:
        if hasattr(query_result, 'tables'):
            data = query_result.tables
            if len(query_result.tables) > 1:
                print('You have more than one table to processs')
    elif query_result.status == LogsQueryStatus.PARTIAL:
        data=query_result.partial_data
        print(query_result.partial_error)
    else:
        print(query_result.error)
    
    if len(query_result.tables) > 1:
        print('You have more than one table to processs')
    for table in data:
        df_la_query = pd.DataFrame(data=table.rows, columns=table.columns)
        return df_la_query

async def slice_query_la(query, lookback_start, lookback_end='0', lookback_unit='h', query_row_limit=400000, split_factor=2):
    "Slice the time to render records <= 500K"
    async with get_credential() as credential, LogsQueryClient(credential=credential) as la_data_client:
        count_query = query.format(lookback_start, lookback_unit, lookback_end)
        count = ' | count'
        count_query = count_query + count
        df_count = await query_la(la_data_client, workspace_id_source, count_query)
        row_count = df_count['Count'][0]
        print(row_count)

    if row_count > query_row_limit:
        number_of_divide = 0
        while row_count > query_row_limit:
            row_count = row_count / split_factor
            number_of_divide = number_of_divide + 1

        factor = split_factor ** number_of_divide
        step_number = math.ceil(int(lookback_start) / factor)
        if factor > int(lookback_start) and lookback_unit == 'h':
            lookback_unit = 'm'
            number_of_minutes = 60
            step_number = math.ceil(int(lookback_start)*number_of_minutes / factor)

            try:
                # return pd.concat(await aiometer.run_all(
                #     (functools.partial(query_la, la_data_client, workspace_id_source, query.format(i * step_number, lookback_unit, (i - 1) * step_number))
                #         for i in range(int(lookback_end), factor + 1)
                #         if i > 0),
                #     max_at_once=5,
                #     max_per_second=6))
                return pd.concat(await throttle_gather(
                    query_la(la_data_client, workspace_id_source, query.format(i * step_number, lookback_unit, (i - 1) * step_number))
                    for i in range(int(lookback_end), factor + 1)
                    if i > 0))
            except:
                print("query failed")
                raise
        else:
            return await query_la(la_data_client, workspace_id_source, query.format(lookback_start, lookback_unit, lookback_end))

### Slice data for query

In [None]:
# Use Dror's test LA table
query_template = "let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}{1}) and TimeGenerated <= ago({2}{1}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip"
lookback_start = '24'

df_final = await slice_query_la(query_template, lookback_start)
print(df_final.shape[0])

In [None]:
df_final

### Service Data: MDTI API

In [None]:
# Calling Microsoft MDTI API for List, the same template can be used for calling other Azure REST APIs with different parameters.
# For different environments, such as national clouds, you may need to use different root_url, please contact with your admins.
# It can be ---.azure.us, ---.azure.microsoft.scloud, ---.azure.eaglex.ic.gov, etc.
def call_mdti_api_for_read(token, resource):
    "Calling Microsoft MDTI API"
    headers = {"Authorization": token, "content-type":"application/json" }
    root_url = "https://graph.microsoft.com"
    mdti_url_template = "{0}/beta/security/threatIntelligence/{1}"
    mdti_url = mdti_url_template.format(root_url, resource)
    # print(mdti_url)
    try:
        response = requests.get(mdti_url, headers=headers, verify=True)
        return response
    except HttpResponseError as e:
        print(f"Calling MDTI API failed: {e}")
        return None

async def get_token_for_graph():
    resource_uri = "https://graph.microsoft.com"
    async with get_credential() as credential:
        access_token = await credential.get_token(resource_uri + "/.default")
        return access_token[0]

In [None]:
# Calling MDTI API, hosts as example
header_token_value = "Bearer {}".format(await get_token_for_graph())
response_mdti_host = call_mdti_api_for_read(header_token_value, "hosts('www.microsoft.com')")

In [None]:
# Merge data
df_final.loc[df_final['ip'].str.startswith('23.'), 'Fact'] = response_mdti_host.json()["registrar"]
df_merged = df_final.rename(columns = {'TimeGenerated': 'TimeGenerated', 'ip': 'IP', 'Fact': 'Fact'})[['TimeGenerated', 'IP', 'Fact']]

In [None]:
# df_merged

## 3. Save result to Azure Log Analytics Custom Table

In [None]:
# User input for Log Analytics workspace for data ingestion
custom_table_name = ""
stream_name = "Custom-" + custom_table_name
immutable_rule_id = ""
dce_endpoint = ""

In [None]:
# function for data converting
def convert_dataframe_to_list_of_dictionaries(df, hasTimeGeneratedColumn):
    listd = df.to_dict('records')
    for row in listd:
        # The dataframe may have more than one datetime columns, add all datetiome columns inside this loop, to render ISO 8601
        if hasTimeGeneratedColumn and row['TimeGenerated'] != None:
            row['TimeGenerated']= row['TimeGenerated'].strftime("%Y-%m-%dT%H:%M:%S.%fZ")
    return listd

def check_dataframe_size_in_mb(df, size_limit_in_mb=25):
    "Check if dataframe has more than 25 MB data, 30 MB is the limit for POST"
    size_in_mb = sys.getsizeof(df) / 1000000
    return size_in_mb / size_limit_in_mb

def partition_dataframe_for_data_infestion(df):
    df_size = check_dataframe_size_in_mb(df)
    if df_size > 1:
        partition_number = math.ceil(df_size)
        index_block = len(df) // partition_number
        for i in range(0,df.shape[0],index_block):
            yield df[i:i+index_block]
    else:
        yield df

In [None]:
# Data ingestion to LA custom table
async with get_credential() as credential, LogsIngestionClient(endpoint=dce_endpoint, credential=credential, logging_enable=True) as client:
    try:
        for ind, df in enumerate(partition_dataframe_for_data_infestion(df_merged)):
            body = convert_dataframe_to_list_of_dictionaries(df, True)
            print(f"{ind}: {df.shape[0]}")
            ingestion_result = await client.upload(rule_id=immutable_rule_id, stream_name=stream_name, logs=body)
    except HttpResponseError as e:
        print(f"Data ingestion failed: {e}")