in detection_rules/kbwrap.py [0:0]
def search_alerts(ctx, query, date_range, columns, extend, max_count):
"""Search detection engine alerts with KQL."""
from eql.table import Table
from .eswrap import MATCH_ALL, add_range_to_dsl
kibana = ctx.obj['kibana']
start_time, end_time = date_range
kql_query = kql.to_dsl(query) if query else MATCH_ALL
add_range_to_dsl(kql_query['bool'].setdefault('filter', []), start_time, end_time)
with kibana:
alerts = [a['_source'] for a in Signal.search({'query': kql_query}, size=max_count)['hits']['hits']]
# check for events with nested signal fields
if alerts:
table_columns = ['host.hostname']
if 'signal' in alerts[0]:
table_columns += ['signal.rule.name', 'signal.status', 'signal.original_time']
elif 'kibana.alert.rule.name' in alerts[0]:
table_columns += ['kibana.alert.rule.name', 'kibana.alert.status', 'kibana.alert.original_time']
else:
table_columns += ['rule.name', '@timestamp']
if columns:
columns = list(columns)
table_columns = table_columns + columns if extend else columns
# Table requires the data to be nested, but depending on the version, some data uses dotted keys, so
# they must be nested explicitly
for alert in alerts:
for key in table_columns:
if key in alert:
nested_set(alert, key, alert[key])
click.echo(Table.from_list(table_columns, alerts))
else:
click.echo('No alerts detected')
return alerts