tools/malware_research/enrich_policy_setup.py (235 lines of code) (raw):

import requests APIKEY = "<api key>" CLUSTER_URL = "https://<cluster name>.es.us-central1.gcp.cloud.es.io/" # (policy name, index filter, dataset) ENRICH_POLICYS = [ ("enrich_fingerprint_label_process_policy_local", "logs-endpoint.events.process-local_benign*", "endpoint.events.process"), ("enrich_fingerprint_label_file_policy_local", "logs-endpoint.events.file-local_benign*", "endpoint.events.file"), ("enrich_fingerprint_label_network_policy_local", "logs-endpoint.events.network-local_benign*", "endpoint.events.network"), ("enrich_fingerprint_label_auth_policy_local", "logs-system.auth-local_benign*", "system.auth"), ("enrich_fingerprint_label_syslog_policy_local", "logs-system.syslog-local_benign*", "system.syslog"), ("enrich_fingerprint_label_fim_policy_local", "logs-fim.event-local_benign*", "fim.event"), ("enrich_fingerprint_label_auditd_manager_policy_local", "logs-auditd_manager.auditd-local_benign*", "auditd_manager.auditd"), ("enrich_fingerprint_label_system_audit_package_policy_local", "logs-system_audit.package-local_benign*", "system_audit.package"), ("enrich_fingerprint_label_packetbeat_policy_local", "logs-network_traffic.*-local_benign*", "packetbeat"), ] LABEL_PIPELINE = "local_enrich_fingerprint_label" API_ENDPOINT_PIPELINE = "/_ingest/pipeline" API_ENDPOINT_ENRICH = "/_enrich/policy" ENRICH_QUERY = { "match": { "indices": "", "match_field": "fingerprint", "enrich_fields": [], "query": { "bool": { "must": [], "filter": [ ], "should": [], "must_not": [ { "match_phrase": { "process.parent.command_line": "/tmp/sample.elf" } }, { "match_phrase": { "process.name": "sample.elf" } }, { "match_phrase": { "file.path": "/tmp/sample.elf" } } ] } } } } INGEST_PIPELINE_CONTENT = { "description" : "Enrich local events with known benign data", "processors" : [ { "enrich": { "field": "fingerprint", "policy_name": "enrich_fingerprint_label_process_policy_local", "target_field": "enrich_label", "ignore_missing": True, "if": "ctx.event.dataset == \"endpoint.events.process\"", "ignore_failure": True, "description": "Process - add 'tag' data based on 'fingerprint'" } }, { "enrich": { "field": "fingerprint", "policy_name": "enrich_fingerprint_label_file_policy_local", "target_field": "enrich_label", "ignore_missing": True, "if": "ctx.event.dataset == \"endpoint.events.file\"", "ignore_failure": True, "description": "File - add 'tag' data based on 'fingerprint'" } }, { "enrich": { "field": "fingerprint", "policy_name": "enrich_fingerprint_label_network_policy_local", "target_field": "enrich_label", "ignore_missing": True, "if": "ctx.event.dataset == \"endpoint.events.network\"", "ignore_failure": True, "description": "Network - add 'tag' data based on 'fingerprint'" } }, { "enrich": { "field": "fingerprint", "policy_name": "enrich_fingerprint_label_syslog_policy_local", "target_field": "enrich_label", "ignore_missing": True, "if": "ctx.event.dataset == \"system.syslog\"", "ignore_failure": True, "description": "Syslog - add 'tag' data based on 'fingerprint'" } }, { "enrich": { "field": "fingerprint", "policy_name": "enrich_fingerprint_label_auth_policy_local", "target_field": "enrich_label", "ignore_missing": True, "if": "ctx.event.dataset == \"system.auth\"", "ignore_failure": True, "description": "Auth - add 'tag' data based on 'fingerprint'" } }, { "enrich": { "field": "fingerprint", "policy_name": "enrich_fingerprint_label_fim_policy_local", "target_field": "enrich_label", "ignore_missing": True, "if": "ctx.event.dataset == \"fim.event\"", "ignore_failure": True, "description": "Fim - add 'tag' data based on 'fingerprint'" } }, { "enrich": { "field": "fingerprint", "policy_name": "enrich_fingerprint_label_auditd_manager_policy_local", "target_field": "enrich_label", "ignore_missing": True, "if": "ctx.event.dataset == \"auditd_manager.auditd\"", "ignore_failure": True, "description": "Auditd Manager - add 'tag' data based on 'fingerprint'" } }, { "enrich": { "field": "fingerprint", "policy_name": "enrich_fingerprint_label_system_audit_package_policy_local", "target_field": "enrich_label", "ignore_missing": True, "if": "ctx.event.dataset == \"system_audit.package\"", "ignore_failure": True, "description": "Audit Package - add 'tag' data based on 'fingerprint'" } }, { "enrich": { "field": "fingerprint", "policy_name": "enrich_fingerprint_label_packetbeat_policy_local", "target_field": "enrich_label", "ignore_missing": True, "if": "ctx.agent.type == \"packetbeat\"", "ignore_failure": True, "description": "Packetbeat - add 'tag' data based on 'fingerprint'" } }, { "script": { "lang": "painless", "source": "if (ctx.containsKey(\"enrich_label\")) {\n ctx.known_benign = true;\n ctx.remove(\"enrich_label\");\n ctx.remove(\"enriched_fingerprint\");\n} else {\n ctx.known_benign = false;\n}", "ignore_failure": True, "description": "Creates a new boolean field called \"known_benign\"" } } ] } def send_delete(uri: str, ) -> dict: headers = { "Authorization" : f"ApiKey {APIKEY}" } url = f"{CLUSTER_URL}{uri}" r = requests.delete(url=url, headers=headers) return r.json() def send_put(uri: str, input: dict) -> dict: headers = { "Authorization" : f"ApiKey {APIKEY}", "Content-Type" : "application/json" } url = f"{CLUSTER_URL}{uri}" r = requests.put(url=url, headers=headers, json=input) return r.json() def send_post_empty(uri: str, params: dict) -> dict: headers = { "Authorization" : f"ApiKey {APIKEY}", "Content-Type" : "application/json" } url = f"{CLUSTER_URL}{uri}" r = requests.post(url=url, headers=headers, params=params) return r.json() def cleanup() -> None: uri = f"{API_ENDPOINT_PIPELINE}/{LABEL_PIPELINE}" send_delete(uri) for policy in ENRICH_POLICYS: uri = f"{API_ENDPOINT_ENRICH}/{policy[0]}" send_delete(uri) def create_enrich_policies() -> None: for policy in ENRICH_POLICYS: uri = f"{API_ENDPOINT_ENRICH}/{policy[0]}" query = ENRICH_QUERY.copy() print(query) query["indices"] = policy[1] if policy[2] == "packetbeat": query["match"]["query"]["bool"]["filter"].append({"match_phrase": {"agent.type": "packetbeat"}}) else: query["match"]["query"]["bool"]["filter"].append({"match_phrase": {"event.dataset": policy[2]}}) send_put(uri, query) def execute_enrich_policies() -> None: for policy in ENRICH_POLICYS: uri = f"{API_ENDPOINT_ENRICH}/{policy[0]}/_execute" send_post_empty(uri, {"wait_for_completion" : "false"}) def recreate_ingest_pipeline() -> None: uri = f"{API_ENDPOINT_PIPELINE}/{LABEL_PIPELINE}" send_put(uri, INGEST_PIPELINE_CONTENT) def main(): cleanup() create_enrich_policies() execute_enrich_policies() recreate_ingest_pipeline() if __name__ == "__main__": main()