in hunting/__main__.py [0:0]
def run_query(uuid: str, file_path: str, run_all: bool, wait_time: int):
"""Run a hunting query by UUID or file path. Only ES|QL queries are supported."""
# Get the hunt path or error message
hunt_path, error_message = get_hunt_path(uuid, file_path)
if error_message:
click.echo(error_message)
return
# Load the user configuration
config = parse_user_config()
if not config:
click.secho("No configuration found. Please add a `detection-rules-cfg` file.", fg="red", bold=True)
return
es_config = filter_elasticsearch_params(config)
# Create a QueryRunner instance
query_runner = QueryRunner(es_config)
# Load the hunting data
hunting_data = query_runner.load_hunting_file(hunt_path)
# Display description
wrapped_description = textwrap.fill(hunting_data.description, width=120)
click.secho("\nHunting Description:", fg="blue", bold=True)
click.secho(f"\n{wrapped_description}\n", bold=True)
# Extract eligible queries
eligible_queries = {i: query for i, query in enumerate(hunting_data.query) if "from" in query}
if not eligible_queries:
click.secho("No eligible queries found in the file.", fg="red", bold=True)
return
if run_all:
# Run all eligible queries if the --all flag is set
query_runner.run_all_queries(eligible_queries, wait_time)
return
# Display available queries
click.secho("Available queries:", fg="blue", bold=True)
for i, query in eligible_queries.items():
click.secho(f"\nQuery {i + 1}:", fg="green", bold=True)
click.echo(query_runner._format_query(query))
click.secho("\n" + "-" * 120, fg="yellow")
# Handle query selection
while True:
try:
query_number = click.prompt("Enter the query number", type=int)
if query_number - 1 in eligible_queries:
selected_query = eligible_queries[query_number - 1]
break
else:
click.secho(f"Invalid query number: {query_number}. Please try again.", fg="yellow")
except ValueError:
click.secho("Please enter a valid number.", fg="yellow")
# Run the selected query
query_runner.run_individual_query(selected_query, wait_time)