in hunting/search.py [0:0]
def _filter_by_data_source(self, data_source: str) -> list:
"""Filter the index by data source, checking both the actual files and the index."""
results = []
seen_uuids = set() # Track UUIDs to avoid duplicates
# Load all TOML data for detailed fields
hunting_content = load_all_toml(self.base_path)
# Step 1: Check files first by their 'integration' field
for hunt_content, file_path in hunting_content:
if data_source in hunt_content.integration:
if hunt_content.uuid not in seen_uuids:
# Prepare the result with full hunt content fields
matches = hunt_content.__dict__.copy()
matches['mitre'] = hunt_content.mitre
matches['data_source'] = hunt_content.integration
matches['uuid'] = hunt_content.uuid
matches['path'] = file_path
results.append(matches)
seen_uuids.add(hunt_content.uuid)
# Step 2: Check the index for generic data sources (e.g., 'aws', 'linux')
if data_source in self.hunting_index:
for query_uuid, query_data in self.hunting_index[data_source].items():
if query_uuid not in seen_uuids:
# Find corresponding TOML content for this query
hunt_content = next((hunt for hunt, path in hunting_content if hunt.uuid == query_uuid), None)
if hunt_content:
# Prepare the result with full hunt content fields
matches = hunt_content.__dict__.copy()
matches['mitre'] = hunt_content.mitre
matches['data_source'] = hunt_content.integration
matches['uuid'] = hunt_content.uuid
matches['path'] = file_path
results.append(matches)
seen_uuids.add(query_uuid)
return results