core/kibana.py (105 lines of code) (raw):

# Copyright 2025 Elasticsearch B.V. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Helper class for accessing the Kibana REST API.""" import json import uuid import requests class Kibana: """Minimal (unofficial) Kibana REST API Python client""" exceptions = requests.exceptions def __init__(self, url=None, cloud_id=None, basic_auth=None, api_key=None, verify_certs=True, ca_certs=None): if not (url or cloud_id): raise ValueError("Either `url` or `cloud_id` must be defined") self.url = url self.session = requests.Session() self.session.headers.update({"Content-Type": "application/json", "kbn-xsrf": str(uuid.uuid4())}) if api_key: self.session.headers["Authorization"] = f"ApiKey {api_key}" if basic_auth: self.session.auth = requests.auth.HTTPBasicAuth(*basic_auth) if not verify_certs: self.session.verify = False elif ca_certs: self.session.verify = ca_certs retry_strategy = requests.packages.urllib3.util.retry.Retry( total=3, allowed_methods=["HEAD", "GET", "OPTIONS"], ) adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy) self.session.mount("https://", adapter) self.session.mount("http://", adapter) if cloud_id: import base64 cluster_name, cloud_info = cloud_id.split(":") domain, es_uuid, kibana_uuid = base64.b64decode(cloud_info.encode("utf-8")).decode("utf-8").split("$") if domain.endswith(":443"): domain = domain[:-4] url_from_cloud = f"https://{kibana_uuid}.{domain}:9243" if self.url and self.url != url_from_cloud: raise ValueError(f"url provided ({self.url}) does not match url derived from cloud_id {url_from_cloud}") self.url = url_from_cloud def close(self): self.session.close() def ping(self): try: self.status() return True except requests.exceptions.ConnectionError: return False def status(self): url = f"{self.url}/api/status" res = self.session.get(url) res.raise_for_status() return res.json() def create_siem_index(self): url = f"{self.url}/api/detection_engine/index" res = self.session.post(url) res.raise_for_status() return res.json() def get_siem_index(self): url = f"{self.url}/api/detection_engine/index" res = self.session.get(url) res.raise_for_status() return res.json() def create_detection_engine_rule(self, rule): url = f"{self.url}/api/detection_engine/rules" res = self.session.post(url, data=json.dumps(rule)) res.raise_for_status() return res.json() def get_detection_engine_rule(self, rule): url = f"{self.url}/api/detection_engine/rules?id={rule['id']}" res = self.session.get(url, data=json.dumps(rule)) res.raise_for_status() return res.json() def delete_detection_engine_rule(self, rule): url = f"{self.url}/api/detection_engine/rules?id={rule['id']}" res = self.session.delete(url) res.raise_for_status() return res.json() def find_detection_engine_rules(self, count_max, enabled=None): count_max += 1 url = f"{self.url}/api/detection_engine/rules/_find?per_page={count_max}" if enabled is not None: url += f"&filter=alert.attributes.enabled:{str(enabled).lower()}" res = self.session.get(url) res.raise_for_status() rules = res.json()["data"] if len(rules) == count_max: raise ValueError(f"The number of returned rules is suspiciously equal to count_max ({count_max})") return rules def create_detection_engine_rules(self, rules): body = "\n".join(json.dumps(rule) for rule in rules) files = {"file": ("rules.ndjson", body, "application/octet-stream")} url = f"{self.url}/api/detection_engine/rules/_import" res = self.session.post(url, files=files, headers={"Content-Type": None}) res.raise_for_status() ret = res.json() if ret["errors"]: raise ValueError("Could not create rule(s):\n " + "\n ".join(str(x) for x in ret["errors"])) return ret def delete_all_detection_engine_rules(self): url = f"{self.url}/api/detection_engine/rules/_bulk_action" req = {"action": "delete", "query": ""} res = self.session.post(url, data=json.dumps(req)) res.raise_for_status() def search_detection_engine_signals(self, body): url = f"{self.url}/api/detection_engine/signals/search" res = self.session.post(url, data=json.dumps(body)) res.raise_for_status() return res.json()