lib/kibana/kibana/resources.py (243 lines of code) (raw):
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0; you may not use this file except in compliance with the Elastic License
# 2.0.
import datetime
from typing import Any, List, Optional, Type
import json
from .connector import Kibana
from . import definitions
DEFAULT_PAGE_SIZE = 10
class BaseResource(dict):
BASE_URI = ""
ID_FIELD = "id"
@property
def id(self):
return self.get(self.ID_FIELD)
@classmethod
def bulk_create_legacy(cls, resources: list):
for r in resources:
assert isinstance(r, cls)
# _bulk_create is being deprecated. Leave for backwards compat only
# the new API would be import with multiple rules within an ndjson request
responses = Kibana.current().post(cls.BASE_URI + "/_bulk_create", data=resources)
return [cls(r) for r in responses]
def create(self):
response = Kibana.current().post(self.BASE_URI, data=self)
self.update(response)
return self
@classmethod
def find(cls, per_page=None, **params) -> iter:
if per_page is None:
per_page = DEFAULT_PAGE_SIZE
# _id is no valid sort field so we sort by name by default
params.setdefault("sort_field", "name")
params.setdefault("sort_order", "asc")
return ResourceIterator(cls, cls.BASE_URI + "/_find", per_page=per_page, **params)
@classmethod
def from_id(cls, resource_id) -> 'BaseResource':
return Kibana.current().get(cls.BASE_URI, params={cls.ID_FIELD: resource_id})
def put(self):
response = Kibana.current().put(self.BASE_URI, data=self.to_dict())
self._update_from(response)
return self
def delete(self):
return Kibana.current().delete(self.BASE_URI, params={"id": self.id})
class ResourceIterator(object):
def __init__(self, cls: Type[BaseResource], uri: str, per_page: int, **params: dict):
self.cls = cls
self.uri = uri
self.params = params
self.page = 0
self.per_page = per_page
self.fetched = 0
self.current = None
self.total = None
self.batch = []
self.batch_pos = 0
self.kibana = Kibana.current()
def __iter__(self):
return self
def _batch(self):
params = dict(per_page=self.per_page, page=self.page + 1, **self.params)
response = self.kibana.get(self.uri, params=params, error=True)
self.page = response["page"]
self.per_page = response["perPage"]
self.total = response["total"]
self.batch = response["data"]
self.batch_pos = 0
self.fetched += len(self.batch)
def __next__(self) -> BaseResource:
if self.total is None or 0 < self.batch_pos == len(self.batch) == self.per_page:
self._batch()
if self.batch_pos < len(self.batch):
result = self.cls(self.batch[self.batch_pos])
self.batch_pos += 1
return result
raise StopIteration()
class RuleResource(BaseResource):
BASE_URI = "/api/detection_engine/rules"
@staticmethod
def _add_internal_filter(is_internal: bool, params: dict) -> dict:
custom_filter = f'alert.attributes.tags:"__internal_immutable:{str(is_internal).lower()}"'
if params.get("filter"):
params["filter"] = f"({params['filter']}) and ({custom_filter})"
else:
params["filter"] = custom_filter
return params
@classmethod
def find_custom(cls, **params):
params = cls._add_internal_filter(False, params)
return cls.find(**params)
@classmethod
def find_elastic(cls, **params):
# GET params:
# * `sort_field`
# * `sort_order`
# * `filter` (accepts KQL)
# alert.attributes.name:mshta
# alert.attributes.enabled:true/false
#
# ...
# i.e. Rule.find_elastic(filter="alert.attributes.name:mshta")
params = cls._add_internal_filter(True, params)
return cls.find(**params)
@classmethod
def bulk_action(
cls, action: definitions.RuleBulkActions, rule_ids: Optional[List[str]] = None, query: Optional[str] = None,
dry_run: Optional[bool] = False, edit_object: Optional[list[definitions.RuleBulkEditActionTypes]] = None,
include_exceptions: Optional[bool] = False, **kwargs
) -> (dict, List['RuleResource']):
"""Perform a bulk action on rules using the _bulk_action API."""
assert not (rule_ids and query), 'Cannot provide both rule_ids and query'
if action == 'edit':
assert edit_object, 'edit action requires edit object'
duplicate = {'include_exceptions': include_exceptions, 'include_expired_exceptions': False}
params = dict(dry_run=stringify_bool(dry_run))
data = dict(action=action, edit=edit_object, duplicate=duplicate)
if query:
data['query'] = query
elif rule_ids:
data['rule_ids'] = rule_ids
response = Kibana.current().post(cls.BASE_URI + "/_bulk_action", params=params, data=data, **kwargs)
# export returns ndjson, which requires manual parsing since response.json() fails
if action == 'export':
response = [json.loads(r) for r in response.text.splitlines()]
result_ids = [r['rule_id'] for r in response if 'rule_id' in r]
else:
results = response['attributes']['results']
result_ids = [r['rule_id'] for r in results['updated']]
result_ids.extend([r['rule_id'] for r in results['created']])
rule_resources = cls.export_rules(result_ids)
return response, rule_resources
@classmethod
def bulk_enable(
cls, rule_ids: Optional[List[str]] = None, query: Optional[str] = None, dry_run: Optional[bool] = False
) -> (dict, List['RuleResource']):
"""Bulk enable rules using _bulk_action."""
return cls.bulk_action("enable", rule_ids=rule_ids, query=query, dry_run=dry_run)
@classmethod
def bulk_disable(
cls, rule_ids: Optional[List[str]] = None, query: Optional[str] = None, dry_run: Optional[bool] = False
) -> (dict, List['RuleResource']):
"""Bulk disable rules using _bulk_action."""
return cls.bulk_action("disable", rule_ids=rule_ids, query=query, dry_run=dry_run)
@classmethod
def bulk_delete(
cls, rule_ids: Optional[List[str]] = None, query: Optional[str] = None, dry_run: Optional[bool] = False
) -> (dict, List['RuleResource']):
"""Bulk delete rules using _bulk_action."""
return cls.bulk_action("delete", rule_ids=rule_ids, query=query, dry_run=dry_run)
@classmethod
def bulk_duplicate(
cls, rule_ids: Optional[List[str]] = None, query: Optional[str] = None, dry_run: Optional[bool] = False,
include_exceptions: Optional[bool] = False
) -> (dict, List['RuleResource']):
"""Bulk duplicate rules using _bulk_action."""
return cls.bulk_action("duplicate", rule_ids=rule_ids, query=query, dry_run=dry_run,
include_exceptions=include_exceptions)
@classmethod
def bulk_export(
cls, rule_ids: Optional[List[str]] = None, query: Optional[str] = None
) -> (dict, List['RuleResource']):
"""Bulk export rules using _bulk_action."""
return cls.bulk_action("export", rule_ids=rule_ids, query=query, raw=True)
@classmethod
def bulk_edit(
cls, edit_object: list[definitions.RuleBulkEditActionTypes], rule_ids: Optional[List[str]] = None,
query: Optional[str] = None, dry_run: Optional[bool] = False
) -> (dict, List['RuleResource']):
"""Bulk edit rules using _bulk_action."""
# setting to error=False because the API returns a 500 with any failures, but includes the success data as well
return cls.bulk_action(
"edit", rule_ids=rule_ids, query=query, dry_run=dry_run, edit_object=edit_object, error=False
)
def put(self):
# id and rule_id are mutually exclusive
rule_id = self.get("rule_id")
self.pop("rule_id", None)
try:
# apparently Kibana doesn't like `rule_id` for existing documents
return super(RuleResource, self).update()
except Exception:
# if it fails, restore the id back
if rule_id:
self["rule_id"] = rule_id
raise
@classmethod
def import_rules(
cls,
rules: List[dict],
exceptions: List[List[dict]] = [],
action_connectors: List[List[dict]] = [],
overwrite: bool = False,
overwrite_exceptions: bool = False,
overwrite_action_connectors: bool = False,
) -> (dict, list, List[Optional["RuleResource"]]):
"""Import a list of rules into Kibana using the _import API and return the response and successful imports."""
url = f'{cls.BASE_URI}/_import'
params = dict(
overwrite=stringify_bool(overwrite),
overwrite_exceptions=stringify_bool(overwrite_exceptions),
overwrite_action_connectors=stringify_bool(overwrite_action_connectors),
)
rule_ids = [r['rule_id'] for r in rules]
flattened_exceptions = [e for sublist in exceptions for e in sublist]
flattened_actions_connectors = [a for sublist in action_connectors for a in sublist]
headers, raw_data = Kibana.ndjson_file_data_prep(
rules + flattened_exceptions + flattened_actions_connectors, "import.ndjson"
)
response = Kibana.current().post(url, headers=headers, params=params, raw_data=raw_data)
errors = response.get("errors", [])
error_rule_ids = [e['rule_id'] for e in errors]
# successful rule_ids are not returned, so they must be implicitly inferred from errored rule_ids
successful_rule_ids = [r for r in rule_ids if r not in error_rule_ids]
rule_resources = cls.export_rules(successful_rule_ids) if successful_rule_ids else []
return response, successful_rule_ids, rule_resources
@classmethod
def export_rules(cls, rule_ids: Optional[List[str]] = None,
exclude_export_details: bool = True) -> List['RuleResource']:
"""Export a list of rules from Kibana using the _export API."""
url = f'{cls.BASE_URI}/_export'
if rule_ids:
rule_ids = {'objects': [{'rule_id': r} for r in rule_ids]}
else:
rule_ids = None
params = dict(exclude_export_details=stringify_bool(exclude_export_details))
response = Kibana.current().post(url, params=params, data=rule_ids, raw=True)
data = [json.loads(r) for r in response.text.splitlines()]
return [cls(r) for r in data]
class Signal(BaseResource):
BASE_URI = "/api/detection_engine/signals"
def __init__(self):
raise NotImplementedError("Signals can't be instantiated yet")
@classmethod
def search(cls, query_dsl: dict, size: Optional[int] = 10):
payload = dict(size=size, **query_dsl)
return Kibana.current().post(f"{cls.BASE_URI}/search", data=payload)
@classmethod
def last_signal(cls) -> (int, datetime.datetime):
query_dsl = {
"aggs": {
"lastSeen": {"max": {"field": "@timestamp"}}
},
'query': {
"bool": {
"filter": [
{"match": {"signal.status": "open"}}
]
}
},
"size": 0,
"track_total_hits": True
}
response = cls.search(query_dsl)
last_seen = response.get("aggregations", {}).get("last_seen", {}).get("value_as_string")
num_signals = response.get("hits", {}).get("total", {}).get("value")
if last_seen is not None:
last_seen = datetime.datetime.strptime(last_seen, "%Y-%m-%dT%H:%M:%S.%f%z")
return num_signals, last_seen
@classmethod
def all(cls, size: Optional[int] = 10):
return cls.search({"query": {"bool": {"filter": {"match_all": {}}}}}, size=size)
@classmethod
def set_status_many(cls, signal_ids: List[str], status: str) -> dict:
return Kibana.current().post(f"{cls.BASE_URI}/status", data={"signal_ids": signal_ids, "status": status})
@classmethod
def close_many(cls, signal_ids: List[str]):
return cls.set_status_many(signal_ids, "closed")
@classmethod
def open_many(cls, signal_ids: List[str]):
return cls.set_status_many(signal_ids, "open")
def stringify_bool(obj: bool) -> str:
"""Convert a boolean to a string."""
assert isinstance(obj, bool), f"Expected a boolean, got {type(obj)}"
return str(obj).lower()