tools/malware_research/enrich_policy_setup.py (235 lines of code) (raw):
import requests
APIKEY = "<api key>"
CLUSTER_URL = "https://<cluster name>.es.us-central1.gcp.cloud.es.io/"
# (policy name, index filter, dataset)
ENRICH_POLICYS = [
("enrich_fingerprint_label_process_policy_local",
"logs-endpoint.events.process-local_benign*",
"endpoint.events.process"),
("enrich_fingerprint_label_file_policy_local",
"logs-endpoint.events.file-local_benign*",
"endpoint.events.file"),
("enrich_fingerprint_label_network_policy_local",
"logs-endpoint.events.network-local_benign*",
"endpoint.events.network"),
("enrich_fingerprint_label_auth_policy_local",
"logs-system.auth-local_benign*",
"system.auth"),
("enrich_fingerprint_label_syslog_policy_local",
"logs-system.syslog-local_benign*",
"system.syslog"),
("enrich_fingerprint_label_fim_policy_local",
"logs-fim.event-local_benign*",
"fim.event"),
("enrich_fingerprint_label_auditd_manager_policy_local",
"logs-auditd_manager.auditd-local_benign*",
"auditd_manager.auditd"),
("enrich_fingerprint_label_system_audit_package_policy_local",
"logs-system_audit.package-local_benign*",
"system_audit.package"),
("enrich_fingerprint_label_packetbeat_policy_local",
"logs-network_traffic.*-local_benign*",
"packetbeat"),
]
LABEL_PIPELINE = "local_enrich_fingerprint_label"
API_ENDPOINT_PIPELINE = "/_ingest/pipeline"
API_ENDPOINT_ENRICH = "/_enrich/policy"
ENRICH_QUERY = {
"match": {
"indices": "",
"match_field": "fingerprint",
"enrich_fields": [],
"query": {
"bool": {
"must": [],
"filter": [
],
"should": [],
"must_not": [
{
"match_phrase": {
"process.parent.command_line": "/tmp/sample.elf"
}
},
{
"match_phrase": {
"process.name": "sample.elf"
}
},
{
"match_phrase": {
"file.path": "/tmp/sample.elf"
}
}
]
}
}
}
}
INGEST_PIPELINE_CONTENT = {
"description" : "Enrich local events with known benign data",
"processors" : [
{
"enrich": {
"field": "fingerprint",
"policy_name": "enrich_fingerprint_label_process_policy_local",
"target_field": "enrich_label",
"ignore_missing": True,
"if": "ctx.event.dataset == \"endpoint.events.process\"",
"ignore_failure": True,
"description": "Process - add 'tag' data based on 'fingerprint'"
}
},
{
"enrich": {
"field": "fingerprint",
"policy_name": "enrich_fingerprint_label_file_policy_local",
"target_field": "enrich_label",
"ignore_missing": True,
"if": "ctx.event.dataset == \"endpoint.events.file\"",
"ignore_failure": True,
"description": "File - add 'tag' data based on 'fingerprint'"
}
},
{
"enrich": {
"field": "fingerprint",
"policy_name": "enrich_fingerprint_label_network_policy_local",
"target_field": "enrich_label",
"ignore_missing": True,
"if": "ctx.event.dataset == \"endpoint.events.network\"",
"ignore_failure": True,
"description": "Network - add 'tag' data based on 'fingerprint'"
}
},
{
"enrich": {
"field": "fingerprint",
"policy_name": "enrich_fingerprint_label_syslog_policy_local",
"target_field": "enrich_label",
"ignore_missing": True,
"if": "ctx.event.dataset == \"system.syslog\"",
"ignore_failure": True,
"description": "Syslog - add 'tag' data based on 'fingerprint'"
}
},
{
"enrich": {
"field": "fingerprint",
"policy_name": "enrich_fingerprint_label_auth_policy_local",
"target_field": "enrich_label",
"ignore_missing": True,
"if": "ctx.event.dataset == \"system.auth\"",
"ignore_failure": True,
"description": "Auth - add 'tag' data based on 'fingerprint'"
}
},
{
"enrich": {
"field": "fingerprint",
"policy_name": "enrich_fingerprint_label_fim_policy_local",
"target_field": "enrich_label",
"ignore_missing": True,
"if": "ctx.event.dataset == \"fim.event\"",
"ignore_failure": True,
"description": "Fim - add 'tag' data based on 'fingerprint'"
}
},
{
"enrich": {
"field": "fingerprint",
"policy_name":
"enrich_fingerprint_label_auditd_manager_policy_local",
"target_field": "enrich_label",
"ignore_missing": True,
"if": "ctx.event.dataset == \"auditd_manager.auditd\"",
"ignore_failure": True,
"description": "Auditd Manager - add 'tag' data based on 'fingerprint'"
}
},
{
"enrich": {
"field": "fingerprint",
"policy_name":
"enrich_fingerprint_label_system_audit_package_policy_local",
"target_field": "enrich_label",
"ignore_missing": True,
"if": "ctx.event.dataset == \"system_audit.package\"",
"ignore_failure": True,
"description": "Audit Package - add 'tag' data based on 'fingerprint'"
}
},
{
"enrich": {
"field": "fingerprint",
"policy_name": "enrich_fingerprint_label_packetbeat_policy_local",
"target_field": "enrich_label",
"ignore_missing": True,
"if": "ctx.agent.type == \"packetbeat\"",
"ignore_failure": True,
"description": "Packetbeat - add 'tag' data based on 'fingerprint'"
}
},
{
"script": {
"lang": "painless",
"source": "if (ctx.containsKey(\"enrich_label\")) {\n ctx.known_benign = true;\n ctx.remove(\"enrich_label\");\n ctx.remove(\"enriched_fingerprint\");\n} else {\n ctx.known_benign = false;\n}",
"ignore_failure": True,
"description": "Creates a new boolean field called \"known_benign\""
}
}
]
}
def send_delete(uri: str, ) -> dict:
headers = {
"Authorization" : f"ApiKey {APIKEY}"
}
url = f"{CLUSTER_URL}{uri}"
r = requests.delete(url=url, headers=headers)
return r.json()
def send_put(uri: str, input: dict) -> dict:
headers = {
"Authorization" : f"ApiKey {APIKEY}",
"Content-Type" : "application/json"
}
url = f"{CLUSTER_URL}{uri}"
r = requests.put(url=url, headers=headers, json=input)
return r.json()
def send_post_empty(uri: str, params: dict) -> dict:
headers = {
"Authorization" : f"ApiKey {APIKEY}",
"Content-Type" : "application/json"
}
url = f"{CLUSTER_URL}{uri}"
r = requests.post(url=url, headers=headers, params=params)
return r.json()
def cleanup() -> None:
uri = f"{API_ENDPOINT_PIPELINE}/{LABEL_PIPELINE}"
send_delete(uri)
for policy in ENRICH_POLICYS:
uri = f"{API_ENDPOINT_ENRICH}/{policy[0]}"
send_delete(uri)
def create_enrich_policies() -> None:
for policy in ENRICH_POLICYS:
uri = f"{API_ENDPOINT_ENRICH}/{policy[0]}"
query = ENRICH_QUERY.copy()
print(query)
query["indices"] = policy[1]
if policy[2] == "packetbeat":
query["match"]["query"]["bool"]["filter"].append({"match_phrase": {"agent.type": "packetbeat"}})
else:
query["match"]["query"]["bool"]["filter"].append({"match_phrase": {"event.dataset": policy[2]}})
send_put(uri, query)
def execute_enrich_policies() -> None:
for policy in ENRICH_POLICYS:
uri = f"{API_ENDPOINT_ENRICH}/{policy[0]}/_execute"
send_post_empty(uri, {"wait_for_completion" : "false"})
def recreate_ingest_pipeline() -> None:
uri = f"{API_ENDPOINT_PIPELINE}/{LABEL_PIPELINE}"
send_put(uri, INGEST_PIPELINE_CONTENT)
def main():
cleanup()
create_enrich_policies()
execute_enrich_policies()
recreate_ingest_pipeline()
if __name__ == "__main__":
main()