tutorials-and-examples/example-notebooks/mp_data.py (54 lines of code) (raw):
# -------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
"""Demo QueryProvider."""
from pathlib import Path
import pickle
from typing import Any, Iterable
from time import sleep
import pandas as pd
def read_pd_df(data_file, query_name):
"""Read DataFrame from file."""
if not Path(data_file).is_file():
raise FileNotFoundError(
f"Data file {data_file} for query {query_name} not found."
)
if data_file.lower().endswith("csv"):
return pd.read_csv(
data_file, infer_datetime_format=True, parse_dates=["TimeGenerated"]
)
return pd.read_pickle(data_file)
class TILookupDemo:
"""TILookup demo class"""
_DATA_DEFS = {
"ipv4": "data/ti_results_ipv4.pkl",
"url": "data/ti_results_url.pkl",
}
def lookup_ioc(self, ioc_type, **kwargs):
"""Lookup single IoC."""
sleep(1)
return read_pd_df(self._DATA_DEFS.get(ioc_type), ioc_type)
@staticmethod
def result_to_df(results):
"""Convert IoC results to DataFrame."""
if isinstance(results, pd.DataFrame):
return results
return pd.DataFrame()
class GeoLiteLookupDemo:
"""GeoLitLookup demo class."""
_DATA_DEFS = {
"ip_locs": "data/ip_locations.pkl",
}
def lookup_ip(
self,
ip_address: str = None,
ip_addr_list: Iterable = None,
ip_entity: Any = None,
):
"""Look up location."""
del ip_address, ip_addr_list, ip_entity
with open(self._DATA_DEFS["ip_locs"], "rb") as iploc_file:
ip_locs = pickle.load(iploc_file)
return str(ip_locs), ip_locs
_ASN_DATA = pd.read_pickle("data/az_whois.df.pkl")
def get_whois_info_demo(ip_addr, show_progress=False):
"""Lookup Whois data from dataframe."""
sleep(0.02)
if show_progress:
print(".", end="")
if "ExtASN" not in _ASN_DATA.columns:
return "Unknown", {}
match_row = _ASN_DATA[_ASN_DATA["AllExtIPs"] == ip_addr]
asn_text = match_row["ExtASN"].unique()[0]
if isinstance(asn_text, tuple):
return asn_text[0], {}
return asn_text, {}