helper-scripts/export_kibana_config.py (336 lines of code) (raw):
#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""
Helper script to export Kibana and Elastic Search objects
"""
import json
import re
import argparse
import sys
import os
import requests
import ndjson
# Quick hack to disable invalid cert warning
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
SCHEME = "https"
KIBANA_URL = SCHEME + "://localhost:5601"
KIBANA_OBJECTS_EXPORT_URL = KIBANA_URL + "/api/saved_objects/_export"
REDELK_OBJ_FILTER = "RedELK"
INDEX_PATTERNS_FILTER = (
"rtops|redirtraffic|implantsdb|bluecheck|credentials|email|redelk|.siem-signals"
)
EXPORT_FILES_PREFIX_KIBANA = "redelk_kibana_"
ES_URL = SCHEME + "://localhost:9200"
ES_TEMPLATES_LIST = [
"rtops",
"redirtraffic",
"implantsdb",
"bluecheck",
"credentials",
"email",
"redelk",
]
ES_INDEX_TEMPLATES_LIST = ["redelk-domainslist"]
EXPORT_FILES_PREFIX_ES = "redelk_elasticsearch_"
DIFF_PATH = "diff/" # path is relative to exportpath
class KibanaExporter:
"""
Kibana exporter
Use it to export data related to RedELK from Kibana and ES
"""
def __init__(self, cmd_line_args) -> None:
self.args = cmd_line_args
self.base_path = os.path.dirname(os.path.abspath(__file__))
self.env_file = os.path.join(self.base_path, "..", "elkserver", ".env")
if self.args.all:
self.kibana_objects = [
"index-pattern",
"search",
"visualization",
"dashboard",
"map",
]
else:
self.kibana_objects = []
if self.args.indexpattern:
self.kibana_objects.append("index-pattern")
if self.args.search:
self.kibana_objects.append("search")
if self.args.visualization:
self.kibana_objects.append("visualization")
if self.args.dashboard:
self.kibana_objects.append("dashboard")
if self.args.map:
self.kibana_objects.append("map")
try:
with open(self.env_file, "r", encoding="utf-8") as env_file:
for line in env_file.readlines():
if "CREDS_redelk=" in line:
env_file_password = line.split("=")[1].strip()
except Exception as error: # pylint: disable=broad-except
print(f"Error opening password file: {error}")
self.kibana_user = (
cmd_line_args.username if cmd_line_args.username else "redelk"
)
self.kibana_password = (
cmd_line_args.password if cmd_line_args.password else env_file_password
)
self.export_path = (
cmd_line_args.exportpath
if cmd_line_args.exportpath
else os.path.join(
self.base_path,
"../elkserver/docker/redelk-base/redelkinstalldata/templates",
)
)
self.diff_exportpath = os.path.join(self.export_path, DIFF_PATH)
if not os.path.exists(self.diff_exportpath):
os.makedirs(self.diff_exportpath)
def run_script(self):
"""
Run the different export / process based on arguments passed to command line
"""
for kibana_object in self.kibana_objects:
if self.args.export:
self.fetch_kibana_object(kibana_object)
if self.args.process:
self.process_kibana_object(kibana_object)
if self.args.estemplate or self.args.all:
if self.args.export:
self.fetch_es_templates()
def fetch_kibana_object(self, obj_type):
"""
Get saved object from Kibana API
"""
try:
print(f"# Fetching kibana objects: {obj_type}")
response = requests.post(
KIBANA_OBJECTS_EXPORT_URL,
json={"type": obj_type},
verify=False,
auth=(self.kibana_user, self.kibana_password),
headers={"kbn-xsrf": "true"},
)
if response.status_code != 200:
print(
f"!!! Error fetching kibana object {obj_type}: HTTP status code {response.status_code}"
)
else:
raw_data = response.text.encode("utf-8")
items = ndjson.loads(raw_data)
if obj_type != "index-pattern":
to_export = []
for ip in items: # pylint: disable=invalid-name
if "attributes" in ip.keys() and "title" in ip["attributes"]:
if re.match(
REDELK_OBJ_FILTER,
ip["attributes"]["title"],
re.IGNORECASE,
):
ip.pop("updated_at", None)
ip["version"] = "1"
to_export.append(ip)
export_file = os.path.join(
self.export_path,
f"{EXPORT_FILES_PREFIX_KIBANA}{obj_type}.ndjson",
)
print(f"\tExporting {obj_type}: {export_file}")
with open(export_file, "w", encoding="utf-8") as obj_file:
ndjson.dump(to_export, obj_file)
else:
for ip in items: # pylint: disable=invalid-name
if "attributes" in ip.keys() and "title" in ip["attributes"]:
if re.match(
INDEX_PATTERNS_FILTER,
ip["attributes"]["title"],
re.IGNORECASE,
):
# print('%s: %s' % (obj_type,ip['attributes']['title']))
index_pattern_name = (
ip["attributes"]["title"][:-2]
if ip["attributes"]["title"].endswith("-*")
else ip["attributes"]["title"]
)
ip.pop("updated_at", None)
ip["version"] = "1"
export_file = os.path.join(
self.export_path,
f"{EXPORT_FILES_PREFIX_KIBANA}{obj_type}_{index_pattern_name}.ndjson",
)
print(f"\tExporting {obj_type}: {export_file}")
with open(
export_file, "w", encoding="utf-8"
) as obj_file:
ndjson.dump([ip], obj_file)
except Exception as error: # pylint: disable=broad-except
print(f"!!! Error fetching kibana object {obj_type}: {error}")
def fetch_es_templates(self):
"""
Get ElasticSearch templates from API
"""
for template_name in ES_TEMPLATES_LIST:
try:
print(f"# Fetching ES template: {template_name}")
response = requests.get(
f"{ES_URL}/_template/{template_name}",
verify=False,
auth=(self.kibana_user, self.kibana_password),
)
raw_data = response.text.encode("utf-8")
tmpl = json.loads(raw_data)
export_file = os.path.join(
self.export_path,
f"{EXPORT_FILES_PREFIX_ES}template_{template_name}.json",
)
print(f"\tExporting index template {template_name}: {export_file}")
with open(export_file, "w", encoding="utf-8") as template_file:
json.dump(
tmpl[template_name], template_file, indent=4, sort_keys=True
)
except Exception as error: # pylint: disable=broad-except
print(f"!!! Error fetching ES template {template_name}: {error}")
# Get ElasticSearch component templates from API
response = requests.get(
f"{ES_URL}/_component_template",
verify=False,
auth=(self.kibana_user, self.kibana_password),
)
json_data = response.json()
if "component_templates" in json_data:
for template in json_data["component_templates"]:
if "redelk" in template["name"]:
export_file = os.path.join(
self.export_path,
f"{EXPORT_FILES_PREFIX_ES}component_template_{template['name']}.json",
)
print(
f"\tExporting component template {template['name']}: {export_file}"
)
with open(export_file, "w", encoding="utf-8") as template_file:
json.dump(template, template_file, indent=4, sort_keys=True)
# Get ElasticSearch index templates from API
response = requests.get(
f"{ES_URL}/_index_template",
verify=False,
auth=(self.kibana_user, self.kibana_password),
)
json_data = response.json()
if "index_templates" in json_data:
for template in json_data["index_templates"]:
if template["name"] in ES_INDEX_TEMPLATES_LIST:
export_file = os.path.join(
self.export_path,
f"{EXPORT_FILES_PREFIX_ES}index_template_{template['name']}.json",
)
print(
f"\tExporting index template {template['name']}: {export_file}"
)
with open(export_file, "w", encoding="utf-8") as template_file:
json.dump(template, template_file, indent=4, sort_keys=True)
def process_kibana_object(self, obj_type, indexpattern=None):
"""
Create json from ndjson kibana object to ease diff during commits
"""
print(f"# Processing kibana object: {obj_type}")
if obj_type != "index-pattern":
src_file_name = f"{EXPORT_FILES_PREFIX_KIBANA}{obj_type}"
else:
if indexpattern is None:
for i in INDEX_PATTERNS_FILTER.split("|"):
self.process_kibana_object(obj_type, indexpattern=i)
return
else:
src_file_name = f"{EXPORT_FILES_PREFIX_KIBANA}{obj_type}_{indexpattern}"
src_file = os.path.join(self.export_path, f"{src_file_name}.ndjson")
diff_file = os.path.join(self.export_path, DIFF_PATH, f"{src_file_name}.json")
print(f"\tOpening {obj_type}: {src_file}")
with open(src_file, "r", encoding="utf-8") as src_ndjson_file:
src_ndjson = ndjson.load(src_ndjson_file)
for src_ndjson_line in src_ndjson:
if obj_type == "index-pattern":
src_ndjson_line["attributes"]["fields"] = sorted(
json.loads(src_ndjson_line["attributes"]["fields"]),
key=lambda x: x["name"],
)
elif obj_type == "search":
src_ndjson_line["attributes"]["kibanaSavedObjectMeta"][
"searchSourceJSON"
] = json.loads(
src_ndjson_line["attributes"]["kibanaSavedObjectMeta"][
"searchSourceJSON"
]
)
elif obj_type == "visualization":
src_ndjson_line["attributes"]["kibanaSavedObjectMeta"][
"searchSourceJSON"
] = json.loads(
src_ndjson_line["attributes"]["kibanaSavedObjectMeta"][
"searchSourceJSON"
]
)
src_ndjson_line["attributes"]["visState"] = json.loads(
src_ndjson_line["attributes"]["visState"]
)
elif obj_type == "dashboard":
src_ndjson_line["attributes"]["kibanaSavedObjectMeta"][
"searchSourceJSON"
] = json.loads(
src_ndjson_line["attributes"]["kibanaSavedObjectMeta"][
"searchSourceJSON"
]
)
src_ndjson_line["attributes"]["optionsJSON"] = json.loads(
src_ndjson_line["attributes"]["optionsJSON"]
)
src_ndjson_line["attributes"]["panelsJSON"] = json.loads(
src_ndjson_line["attributes"]["panelsJSON"]
)
print(f"\tWriting output to: {diff_file}")
with open(diff_file, "w", encoding="utf-8") as dst_json_file:
json.dump(src_ndjson, dst_json_file, indent=4, sort_keys=True)
def check_args():
"""
Checks arguments passed to the script
"""
parser = argparse.ArgumentParser(description="")
parser.add_argument(
"--exportpath",
metavar="<exportpath>",
dest="exportpath",
help="Path to export the objects",
)
parser.add_argument(
"--indexpattern", action="store_true", help="Export Kibana index patterns"
)
parser.add_argument("--search", action="store_true", help="Export Kibana searches")
parser.add_argument(
"--visualization", action="store_true", help="Export Kibana visualizations"
)
parser.add_argument(
"--dashboard", action="store_true", help="Export Kibana dashboards"
)
parser.add_argument("--map", action="store_true", help="Export Kibana maps")
parser.add_argument(
"--estemplate", action="store_true", help="Export Elasticsearch templates"
)
parser.add_argument(
"--all",
action="store_true",
help="Export all Kibana objects (similar to --indexpattern --search --visualizations --dashboards --estemplate --maps)",
)
parser.add_argument(
"--export",
action="store_true",
help="Export data (either --export of --process required)",
)
parser.add_argument(
"--process",
action="store_true",
help="Process locally saved NDJSON files for easy diff (either --export of --process required)",
)
parser.add_argument(
"--username",
metavar="<username>",
dest="username",
help="Elastic username, if not provided default 'redelk' is used",
)
parser.add_argument(
"--password",
metavar="<password>",
dest="password",
help="Elastic password, if not provided config file ../elkserver/.env will be parsed",
)
script_args = parser.parse_args()
# pylint: disable=too-many-boolean-expressions
if (
not script_args.indexpattern
and not script_args.search
and not script_args.visualization
and not script_args.dashboard
and not script_args.all
and not script_args.estemplate
and not script_args.map
and not (script_args.export or script_args.process)
):
print("[X] Missing argument")
sys.exit(-1)
if not script_args.export and not script_args.process:
print("[X] Either --export of --process argument required")
sys.exit(-1)
return script_args
if __name__ == "__main__":
args = check_args()
exporter = KibanaExporter(args)
exporter.run_script()