# Automate Tools - Parquet Files Generator

__Notebook Version:__ 1.0<br>
__Python Version:__ Python 3.8<br>
__Apache Spark Version:__ 3.1<br>
__Required Packages:__ azure-monitor-query, azure-mgmt-loganalytics<br>
__Platforms Supported:__  Azure Synapse Analytics
    
### Description

## Table of Contents
1. Warm-up
2. Azure Log Analytics Data Queries
3. Save result to Azure Log Analytics Custom Table

## 1. Warm-up

In [None]:
# Load Python libraries that will be used in this notebook
from azure.mgmt.loganalytics import LogAnalyticsManagementClient
from azure.monitor.query import LogsQueryClient, MetricsQueryClient, LogsQueryStatus
from azure.monitor.ingestion import LogsIngestionClient
from azure.identity import AzureCliCredential, DefaultAzureCredential, ClientSecretCredential
from azure.core.exceptions import  HttpResponseError 

import sys
from datetime import datetime, timezone, timedelta
import requests
import pandas as pd
import numpy
import json
import math
import ipywidgets
from IPython.display import display, HTML, Markdown

In [None]:
# User input for Log Analytics workspace as the data source for querying
subscription_id_source = ""
resource_group_name_source = ""
workspace_name_source = ""
workspace_id_source = ""
workspace_resource_id_source = "/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}".format(subscription_id_source, resource_group_name_source, workspace_name_source)


In [None]:
# User input for Log Analytics workspace for data ingestion
tenant_id = ""
subscription_id = ""
workspace_id = ""
resource_group_name = ""
location = ""
workspace_name = ''
workspace_resource_id = "/subscriptions/{0}/resourceGroups/{1}/providers/Microsoft.OperationalInsights/workspaces/{2}".format(subscription_id, resource_group_name, workspace_name)
data_collection_endpoint_name = ""
data_collection_rule_name = ""
custom_table_name = ""
stream_name = "Custom-" + custom_table_name
immutable_rule_id = ""
dce_endpoint = ""

akv_name = ""
client_id_name = ""
client_secret_name = ""
akv_link_name = ""

In [None]:
# You may need to change resource_uri for various cloud environments.
resource_uri = "https://api.loganalytics.io"
client_id = mssparkutils.credentials.getSecret(akv_name, client_id_name, akv_link_name)
client_secret = mssparkutils.credentials.getSecret(akv_name, client_secret_name, akv_link_name)

credential = ClientSecretCredential(
    tenant_id=tenant_id, 
    client_id=client_id, 
    client_secret=client_secret)
access_token = credential.get_token(resource_uri + "/.default")
token = access_token[0]

## 2. Azure Log Analytics Data Queries

In [None]:
# Functions for query
def query_la(workspace_id_query, query):
    la_data_client = LogsQueryClient(credential=credential)
    end_time =  datetime.now(timezone.utc)
    start_time = end_time - timedelta(15)

    query_result = la_data_client.query_workspace(
        workspace_id=workspace_id_query,
        query=query,
        timespan=(start_time, end_time))
    
    df_la_query = pd.DataFrame

    if query_result.status == LogsQueryStatus.SUCCESS:
        if hasattr(query_result, 'tables'):
            data = query_result.tables
            if len(query_result.tables) > 1:
                print('You have more than one tyable to processs')
    elif query_result.status == LogsQueryStatus.PARTIAL:
        data=query_result.partial_data
        print(query_result.partial_error)
    else:
        print(query_result.error)
    
    if len(query_result.tables) > 1:
        print('You have more than one tyable to processs')
    for table in data:
        df_la_query = pd.DataFrame(data=table.rows, columns=table.columns)
        return df_la_query

def slice_query_la(query, lookback_start, lookback_end='0', lookback_unit='h', query_row_limit=400000, split_factor=2):
    "Slice the time to render records <= 500K"
    count_query = query.format(lookback_start, lookback_unit, lookback_end)
    count = ' | summarize count()'
    count_query = count_query + count
    df_count = query_la(workspace_id_source, count_query)
    row_count = df_count['count_'][0]
    print(count_query)
    print(row_count)
    df_final = pd.DataFrame()

    if row_count > query_row_limit:
        number_of_divide = 0
        while row_count > query_row_limit:
            row_count = row_count / split_factor
            number_of_divide = number_of_divide + 1

        factor = split_factor ** number_of_divide
        step_number = math.ceil(int(lookback_start) / factor)
        if factor > int(lookback_start) and lookback_unit == 'h':
            lookback_unit = 'm'
            number_of_minutes = 60
            step_number = math.ceil(int(lookback_start)*number_of_minutes / factor)

        try:
            for i in range(int(lookback_end), factor + 1, 1):
                if i > 0:
                    df_la_query = pd.DataFrame
                    current_query = query.format(i * step_number, lookback_unit, (i - 1) * step_number)
                    print(current_query)
                    df_la_query = query_la(workspace_id_source, current_query)
                    print(df_la_query.shape[0])
                    df_final = pd.concat([df_final, df_la_query])
        except:
            print("query failed")
            raise
    else:
        df_final = query_la(workspace_id_source, query.format(lookback_start, lookback_unit, lookback_end))

    return df_final

### Slice data for query

In [None]:
# Use Dror's test LA table
query_template = "let t1 = SecurityAlert | extend ent = parse_json(Entities)| extend ip = tostring(ent[0]['Address']) | project-keep TimeGenerated, ip; let t2 = CommonSecurityLog | where TimeGenerated > ago({0}{1}) and TimeGenerated <= ago({2}{1}) | project ip = DestinationIP; t1 | join kind=innerunique t2 on ip"
lookback_start = '4'

df_final = slice_query_la(query_template, lookback_start)
print(df_final.shape[0])

In [None]:
spark.conf.set("spark.sql.execution.arrow.enabled","true")
spark_final=spark.createDataFrame(df_final) 
spark_final.printSchema()
spark_final.show()

In [None]:
path = 'abfss://modsynapsefiles@modstorageforsynapse.dfs.core.windows.net/demodata/df_final/{0}'.format(datetime.now().strftime('%Y%m%d%H%M%S'))

In [None]:
spark_final.write.parquet(path, mode='overwrite')

In [None]:
spark.read.parquet(path).count()