utils/check_flows.py (93 lines of code) (raw):
"""
This script processes mitmproxy flow files, filters them based on prefixes, and logs requests to domains specified as excluded.
It will:
- Identify flow files associated with given prefix.
- Parse each flow file and iterate through the recorded HTTP flows.
- For each request, if its host matches an excluded domain, log a warning.
"""
import argparse
import logging
import os
import sys
from mitmproxy import io
from mitmproxy.exceptions import FlowReadException
def setup_logging(log_to_file: bool, log_file: str) -> logging.Logger:
"""
Set up logging configuration.
Parameters
----------
log_to_file : bool
Whether to log messages to a file instead of the console.
log_file : str
The file path where logs should be saved if log_to_file is True.
Returns
-------
logger : logging.Logger
Configured logger instance.
"""
if log_to_file:
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
)
else:
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
return logger
def filter_flows(path: str = "flows", prefixes: list = []) -> list:
"""
Filter flow files by prefixes.
Parameters
----------
path : str
The directory containing flow files.
prefixes : list
A list of prefixes to filter the flow files by. If empty, all flow files are included.
Returns
-------
files : list
A list of file paths that match the filter criteria.
"""
files = []
for file in os.listdir(path):
# If no prefixes are provided, include all files.
# Otherwise, include only files that start with any of the specified prefixes.
if not prefixes or any(file.startswith(prefix) for prefix in prefixes):
files.append(os.path.join(path, file))
return files
def check_flows(
path: str,
exclude_domains: list,
prefixes: list,
logger: logging.Logger,
verbose: bool,
) -> bool:
"""
Process flow files and log requests to excluded domains.
Parameters
----------
path : str
The directory containing flow files.
exclude_domains : list
A list of domain strings. Requests whose hosts match any of these domains are logged.
prefixes : list
A list of prefixes to filter flow files by.
logger : logging.Logger
Logger instance for logging information, warnings, and errors.
verbose : bool
Whether to log additional information about the request.
Returns
-------
bool
True if any unintended flow is found, False otherwise.
"""
unintended_flow_found = False
# Retrieve filtered flow files based on provided prefixes
for file_path in filter_flows(path, prefixes):
logger.info(f"Processing flow file: {file_path}")
try:
with open(file_path, "rb") as f:
freader = io.FlowReader(f)
for flow in freader.stream():
# Check if the request host matches any excluded domain
# Using substring matching here so that partial matches count.
if not any(
domain in flow.request.pretty_host for domain in exclude_domains
):
logger.warning(f"Flag raised: {flow.request.pretty_host}")
unintended_flow_found = True
if verbose:
logger.warning(f"Request URL: {flow.request.url}\n")
logger.warning(f"Request Method: {flow.request.method}\n")
logger.warning(f"Request Headers: {flow.request.headers}\n")
logger.warning(f"Request Content: {flow.request.content}\n")
logger.warning(f"Response Status Code: {flow.response.status_code}\n")
logger.warning(f"Response Headers: {flow.response.headers}\n")
logger.warning(f"Response Content: {flow.response.content}\n\n")
except FlowReadException as e:
logger.error(f"Flow file '{file_path}' is not a valid flow file: {e}")
except Exception as ex:
logger.error(f"An error occurred while processing '{file_path}': {ex}")
return unintended_flow_found
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process mitmproxy flow files.")
parser.add_argument(
"--path", help="Path to the mitmproxy flow files", default="flows"
)
parser.add_argument(
"-e",
"--exclude_domains",
nargs="*",
help="List of domains to identify and log from the flows.",
default=["www.expensify.com", "ws-mt1.pusher.com"],
)
parser.add_argument(
"-p",
"--prefixes",
nargs="*",
help="List of prefixes to filter the flows.",
default=[],
)
parser.add_argument(
"--log_to_file", action="store_true", help="Log to a file instead of console."
)
parser.add_argument(
"--log_file", help="Log file name if logging to a file.", default="app.log"
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
help="Log additional information about the request.",
default=False,
)
args = parser.parse_args()
logger = setup_logging(args.log_to_file, args.log_file)
unintended_flow_found = check_flows(args.path, args.exclude_domains, args.prefixes, logger, args.verbose)
sys.exit(1 if unintended_flow_found else 0)