tools/malware_research/custom_pipelines.py (68 lines of code) (raw):

import requests from typing import Any APIKEY = "<api key>" CLUSTER_URL = "https://<cluster name>.es.us-central1.gcp.cloud.es.io/" def send_post(uri: str, body: dict) -> dict: headers = { "Authorization" : f"ApiKey {APIKEY}" } url = f"{CLUSTER_URL}{uri}" r = requests.post(url=url, headers=headers, data=body) return r.json() def send_put(uri: str, input: dict) -> dict: headers = { "Authorization" : f"ApiKey {APIKEY}", "Content-Type" : "application/json" } url = f"{CLUSTER_URL}{uri}" r = requests.put(url=url, headers=headers, json=input) return r.json() def send_get(uri: str) -> dict: headers = { "Authorization" : f"ApiKey {APIKEY}" } url = f"{CLUSTER_URL}{uri}" r = requests.get(url=url, headers=headers) return r.json() def find_cutom_pipeline_processor(pipeline: dict[str, Any]) -> str: if not "_meta" in pipeline.keys(): return "" if not pipeline["_meta"]["managed_by"] == "fleet": return "" for p in pipeline["processors"]: if not "pipeline" in p.keys(): continue if p["pipeline"]["name"].endswith("@custom"): return p["pipeline"]["name"] return "" def get_custom_pipelines() -> list: uri = '_ingest/pipeline' pipelines = [] pipelines_response = send_get(uri) for name, pipeline in pipelines_response.items(): if name.startswith("logs-"): custom_processor = find_cutom_pipeline_processor(pipeline) if custom_processor: pipelines.append(custom_processor) return list(set(pipelines)) def create_pipeline(pipeline: str) -> dict: uri = f"_ingest/pipeline/{pipeline}" body = { "processors": [ { "pipeline": { "name": "process_local_events", "if": "ctx['data_stream']['namespace'] == 'local_malicious'" } } ] } result = send_put(uri, body) return result def main(): custom_pipelines = get_custom_pipelines() for p in custom_pipelines: print(p) print(create_pipeline(p)) if __name__ == "__main__": main()