hunting/search.py (124 lines of code) (raw):

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License # 2.0; you may not use this file except in compliance with the Elastic License # 2.0. from pathlib import Path import click from detection_rules.attack import tactics_map, technique_lookup from .utils import load_index_file, load_all_toml class QueryIndex: def __init__(self, base_path: Path): """Initialize with the base path and load the index.""" self.base_path = base_path self.hunting_index = load_index_file() self.mitre_technique_ids = set() self.reverse_tactics_map = {v: k for k, v in tactics_map.items()} def _process_mitre_filter(self, mitre_filter: tuple): """Process the MITRE filter to gather all matching techniques.""" for filter_item in mitre_filter: if filter_item in self.reverse_tactics_map: self._process_tactic_id(filter_item) elif filter_item in technique_lookup: self._process_technique_id(filter_item) def _process_tactic_id(self, filter_item): """Helper method to process a tactic ID.""" tactic_name = self.reverse_tactics_map[filter_item] click.echo(f"Found tactic ID {filter_item} (Tactic Name: {tactic_name}). Searching for associated techniques.") for tech_id, details in technique_lookup.items(): kill_chain_phases = details.get('kill_chain_phases', []) if any(tactic_name.lower().replace(' ', '-') == phase['phase_name'] for phase in kill_chain_phases): self.mitre_technique_ids.add(tech_id) def _process_technique_id(self, filter_item): """Helper method to process a technique or sub-technique ID.""" self.mitre_technique_ids.add(filter_item) if '.' not in filter_item: sub_techniques = { sub_tech_id for sub_tech_id in technique_lookup if sub_tech_id.startswith(f"{filter_item}.") } self.mitre_technique_ids.update(sub_techniques) def search(self, mitre_filter: tuple = (), data_source: str = None, keyword: str = None) -> list: """Search the index based on MITRE techniques, data source, or keyword.""" results = [] # Step 1: If data source is provided, filter by data source first if data_source: click.echo(f"Filtering by data source: {data_source}") results = self._filter_by_data_source(data_source) if not results: # data source always takes precedence over other filters if provided click.echo(f"No matching queries found for data source: {data_source}") return results # Step 2: If MITRE filter is provided, process the filter if mitre_filter: click.echo(f"Searching for MITRE techniques: {mitre_filter}") self._process_mitre_filter(mitre_filter) if results: # Filter existing results further by MITRE if data source results already exist results = [result for result in results if any(tech in self.mitre_technique_ids for tech in result['mitre'])] else: # Otherwise, perform a fresh search based on MITRE filter results = self._search_index(mitre_filter) # Step 3: If keyword is provided, search for it in name, description, and notes if keyword: click.echo(f"Searching for keyword: {keyword}") if results: # Filter existing results further by keyword results = [result for result in results if self._matches_keyword(result, keyword)] else: # Perform a fresh search by keyword results = self._search_keyword(keyword) return self._handle_no_results(results, mitre_filter, data_source, keyword) def _search_index(self, mitre_filter: tuple = ()) -> list: """Private method to search the index based on MITRE filter.""" results = [] # Load all TOML data for detailed fields hunting_content = load_all_toml(self.base_path) for hunt_content, file_path in hunting_content: query_techniques = hunt_content.mitre if mitre_filter and not any(tech in self.mitre_technique_ids for tech in query_techniques): continue # Prepare the result with full hunt content fields matches = hunt_content.__dict__.copy() matches['mitre'] = hunt_content.mitre matches['data_source'] = hunt_content.integration matches['uuid'] = hunt_content.uuid matches['path'] = file_path results.append(matches) return results def _search_keyword(self, keyword: str) -> list: """Private method to search description, name, notes, and references fields for a keyword.""" results = [] hunting_content = load_all_toml(self.base_path) for hunt_content, file_path in hunting_content: # Assign blank if notes or references are missing notes = '::'.join(hunt_content.notes) if hunt_content.notes else '' references = '::'.join(hunt_content.references) if hunt_content.references else '' # Combine name, description, notes, and references for the search combined_content = f"{hunt_content.name}::{hunt_content.description}::{notes}::{references}" if keyword.lower() in combined_content.lower(): # Copy hunt_content data and prepare the result matches = hunt_content.__dict__.copy() matches['mitre'] = hunt_content.mitre matches['data_source'] = hunt_content.integration matches['uuid'] = hunt_content.uuid matches['path'] = file_path results.append(matches) return results def _filter_by_data_source(self, data_source: str) -> list: """Filter the index by data source, checking both the actual files and the index.""" results = [] seen_uuids = set() # Track UUIDs to avoid duplicates # Load all TOML data for detailed fields hunting_content = load_all_toml(self.base_path) # Step 1: Check files first by their 'integration' field for hunt_content, file_path in hunting_content: if data_source in hunt_content.integration: if hunt_content.uuid not in seen_uuids: # Prepare the result with full hunt content fields matches = hunt_content.__dict__.copy() matches['mitre'] = hunt_content.mitre matches['data_source'] = hunt_content.integration matches['uuid'] = hunt_content.uuid matches['path'] = file_path results.append(matches) seen_uuids.add(hunt_content.uuid) # Step 2: Check the index for generic data sources (e.g., 'aws', 'linux') if data_source in self.hunting_index: for query_uuid, query_data in self.hunting_index[data_source].items(): if query_uuid not in seen_uuids: # Find corresponding TOML content for this query hunt_content = next((hunt for hunt, path in hunting_content if hunt.uuid == query_uuid), None) if hunt_content: # Prepare the result with full hunt content fields matches = hunt_content.__dict__.copy() matches['mitre'] = hunt_content.mitre matches['data_source'] = hunt_content.integration matches['uuid'] = hunt_content.uuid matches['path'] = file_path results.append(matches) seen_uuids.add(query_uuid) return results def _matches_keyword(self, result: dict, keyword: str) -> bool: """Check if the result matches the keyword in name, description, or notes.""" # Combine relevant fields for keyword search notes = '::'.join(result.get('notes', [])) if 'notes' in result else '' references = '::'.join(result.get('references', [])) if 'references' in result else '' combined_content = f"{result['name']}::{result['description']}::{notes}::{references}" return keyword.lower() in combined_content.lower() def _handle_no_results(self, results: list, mitre_filter=None, data_source=None, keyword=None) -> list: """Handle cases where no results are found.""" if not results: if mitre_filter and not self.mitre_technique_ids: click.echo(f"No MITRE techniques found for the provided filter: {mitre_filter}.") if data_source: click.echo(f"No matching queries found for data source: {data_source}") if keyword: click.echo(f"No matches found for keyword: {keyword}") return results