tsdb/_tools/anonymize.py (475 lines of code) (raw):
#!/usr/bin/env python3
####################################################################
#
# anonymize a metricbeat dump
#
####################################################################
#
# This is a strange balance of openness and paranoia. And all hacky.
# When we see a field or log pattern we don't recognize we fail the
# processing entirely and raise an error. So that a human can look at
# the new thing and decide if it should be changed. We're fairly
# willing to leave things, though we consume the bodies of many log
# messages and replace uuids and host names and ips and application
# names. But we let metric values pass right through. We need to let
# them through because TSDB is optimized for real world data.
#
####################################################################
import ipaddress
import json
import sys
import uuid
def passthrough(v):
return v
def numbered(prefix):
numbers = {}
def number_it(v):
if v not in numbers:
numbers[v] = len(numbers)
return f"{prefix}{numbers[v]}"
return number_it
def uids():
uids = {}
def replace_uid(v):
if v not in uids:
uids[v] = uuid.uuid4()
return str(uids[v])
return replace_uid
def ips():
ips = {}
def replace_ip(v):
if v not in ips:
ips[v] = ipaddress.IPv4Address(len(ips))
return str(ips[v])
return replace_ip
def container_runtime(v):
if v == "docker":
return v
raise ValueError(f"unexpected service address [{v}]")
def service_type(v):
if v == "kubernetes":
return v
raise ValueError(f"unexepected service type [{v}]")
container_ids = {}
def container_id(id):
if id not in container_ids:
container_ids[id] = uuid.uuid4()
return str(container_ids[id])
def k8s_container_id(id):
if id.startswith("docker://"):
return "docker://" + container_id(id[len("docker://") :])
raise ValueError(f"unexepected k8s container prefix [{id}]")
K8S_IMAGE_PASSTHROUGH = {
"centos:7",
"gcr.io/cloud-builders/gsutil:latest",
"nginx:stable",
}
k8s_images_docker_es_co = numbered("docker.elastic.co/image-name-")
k8s_images_elastic = numbered("elastic/image-name-")
k8s_images_gradle = numbered("registry.replicated.com/gradleenterprise/image-name-")
k8s_images_other = numbered("anon/image-")
def k8s_container_image(img):
if img in K8S_IMAGE_PASSTHROUGH:
return img
if img.startswith("sha256:"):
return img
if img.startswith("docker.elastic.co/"):
return k8s_images_docker_es_co(img[len("docker.elastic.co/") :])
if img.startswith("elastic/"):
return k8s_images_docker_es_co(img[len("elastic/") :])
if img.startswith("registry.replicated.com/gradleenterprise/"):
return k8s_images_gradle(img[len("registry.replicated.com/gradleenterprise/") :])
if "elastic" in img:
raise ValueError(f"unexpected k8s container image [{img}]")
return k8s_images_other(img)
K8S_MESSAGE_PASSTHROUGH = {
"Back-off restarting failed container",
'Container image "centos:7" already present on machine',
"Created container check",
"deleting pod for node scale down",
"Error: ImagePullBackOff",
"Job has reached the specified backoff limit",
"Job was active longer than specified deadline",
"marked the node as toBeDeleted/unschedulable",
"No matching pods found",
"node removed by cluster autoscaler",
"Pod sandbox changed, it will be killed and re-created.",
"Pod was evicted by VPA Updater to apply resource recommendation.",
"Started container check",
"Starting kubelet.",
"Starting kube-proxy.",
"Too many active pods running after completion count reached",
"Updated load balancer with new hosts",
"Updated Node Allocatable limit across pods",
}
K8S_MESSAGE_SNIP = {
"event: Deleting": "Deleting node <snip>",
"event: Removing Node": "Removing node <snip>",
"event: Registered Node": "registered node <snip>",
"Created pod: ": "Create pod <snip>",
"Successfully assigned": "Successfully assigned <snip>",
"Created job": "Created job <snip>",
"Scaled up replica set": "Scaled up replica set",
"Pulling image": "Pulling image <snip>",
"Started container": "Started container <snip>",
"Successfully pulled image": "Successfully pulled image <snip>",
"Created container": "Created container <snip>",
"Cannot determine if job needs to be started": "Cannot determine if job needs to be started <snip>",
"Saw completed job": "Saw completed job <snip>",
"Deleted job": "Deleted job <snip>",
"AttachVolume.Attach succeeded for volume": "AttachVolume.Attach succeeded for volume <snip>",
"Scaled down replica set": "Scaled down replica set <snip>",
"Stopping container": "Stopping container <snip>",
"Deleted pod": "Deleted pod <snip>",
"Back-off pulling image": "Back-off pulling image <snip>",
"Liveness probe failed": "Liveness probe failed <snip>",
"delete Pod": "Delete pod <snip>",
"create Pod": "Create pod <snip>",
"status is now: NodeNotReady": "Node not ready <snip>",
"Readiness probe failed": "Readiness probe failed <snip>",
"Readiness probe errored": "Readiness probe errored <snip>",
"nodes are available:": "Bad nodes <snip>",
"Saw a job that the controller did not create or forgot": "Saw a job that the controller did not create or forgot <snip>",
"or sacrifice child": "Brutal murder by oomkiller",
"Scale-up": "Scale-up <snip>",
"Scale-down": "Scale-down <snip>",
"status is now": "<snip> status change <snip>",
"failed for volume": "<snip> failed for volume <snip>",
"pod triggered scale-up": "pod triggered scale-up <snip>",
"Unable to mount volumes for pod": "Unable to mount volumes for pod <snip>",
"Volume is already exclusively attached to one node": "Volume is already exclusively attached to one node <snip>",
"failed liveness probe, will be restarted": "<snip> failed liveness probe, will be restarted",
"Failed create pod sandbox": "Failed create pod sandbox <snip>",
"Error: cannot find volume": "Error: cannot find volume <snip>",
"Couldn't find key user in": "Couldn't find key user in <snip>",
}
def k8s_message(message):
if message in K8S_MESSAGE_PASSTHROUGH:
return message
if "Error: secret" in message and "not found" in message:
return "Error: secret <snip> not found"
if "Container image " in message and "already present on machine" in message:
return "Container image <snip> already present on machine"
if '"unmanaged"' in message:
return str(uuid.uuid4())
for trigger, replacement in K8S_MESSAGE_SNIP.items():
if trigger in message:
return replacement
raise ValueError(f"unsupported k8s.event.message [{message}]")
def k8s_event_generate_name(v):
if v == "":
return v
raise ValueError(f"unsupported k8s.event.generate_name [{v}]")
PASSTHROUGH_REASONS = {
"BackOff",
"BackoffLimitExceeded",
"CIDRAssignmentFailed",
"Created",
"DeadlineExceeded",
"EvictedByVPA",
"Failed",
"FailedAttachVolume",
"FailedCreatePodSandBox",
"FailedMount",
"FailedNeedsStart",
"FailedScheduling",
"Killing",
"NodeAllocatableEnforced",
"NodeHasNoDiskPressure",
"NodeHasSufficientMemory",
"NodeHasSufficientPID",
"NodeNotReady",
"NodeReady",
"NodeSysctlChange",
"NoPods",
None,
"OOMKilling",
"Pulled",
"Pulling",
"RegisteredNode",
"RemovingNode",
"SandboxChanged",
"SawCompletedJob",
"ScaleDown",
"ScaleDownEmpty",
"ScaledUpGroup",
"ScalingReplicaSet",
"Scheduled",
"Started",
"Starting",
"SuccessfulAttachVolume",
"SuccessfulCreate",
"SuccessfulDelete",
"TooManyActivePods",
"TriggeredScaleUp",
"UnexpectedJob",
"Unhealthy",
"UpdatedLoadBalancer",
}
def k8s_event_reason(v):
if v in PASSTHROUGH_REASONS:
return v
if "because it does not exist in the cloud provider" in v:
return "Deleting <snip> because it does not exist in the cloud provider"
raise ValueError(f"unsupported k8s.event.reason [{v}]")
def k8s_event_type(v):
if v in {"Normal", "Warning"}:
return v
raise ValueError(f"unsupported k8s.event.type [{v}]")
def k8s_system_container(v):
if v in {"kubelet", "pods", "runtime"}:
return v
raise ValueError(f"unsupported k8s.system.container [{v}]")
def k8s_labels_heritage(v):
if v in {"Helm", "Tiller"}:
return v
raise ValueError(f"unsupported k8s.labels.heritage [{v}]")
K8S_LABELS_K8S_APP_PASSTHROUGH = {
"dashboard-metrics-scraper",
"glbc",
"kubernetes-dashboard",
"kube-dns",
"kube-dns-autoscaler",
"metrics-server",
}
def k8s_labels_k8s_app(v):
if v in K8S_LABELS_K8S_APP_PASSTHROUGH:
return v
raise ValueError(f"unsupported k8s.labels.k8s-app [{v}]")
def k8s_labels_k8s_arch(v):
if v == "amd64":
return v
raise ValueError(f"unsupported kubernetes.labels.kubernetes_io/arch [{v}]")
def k8s_labels_k8s_os(v):
if v == "linux":
return v
raise ValueError(f"unsupported kubernetes.labels.kubernetes_io/os [{v}]")
def k8s_pod_status_phase(v):
if v in {"failed", "pending", "running", "succeeded"}:
return v
raise ValueError(f"unsupported kubernetes.pod.status.phase [{v}]")
k8s_labels_names = numbered("k8s-labels-name-")
def k8s_labels_name(v):
if v in {"glbc", "tiller"}:
return v
if v == "export-workday-logs-hourly":
return k8s_labels_names(v)
raise ValueError(f"unsupported kubernetes.labels.name [{v}]")
def k8s_labels_app_managed_by(v):
if v == "Tiller":
return v
raise ValueError(f"unsupported kubernetes.labels.app_kubernetes_io/managed-by [{v}]")
K8S_CONTAINER_STATUS_REASON = {
"Completed",
"ContainerCreating",
"CrashLoopBackOff",
"CreateContainerConfigError",
"ErrImagePull",
"Error",
"ImagePullBackOff",
"OOMKilled",
}
def k8s_container_status_reason(v):
if v in K8S_CONTAINER_STATUS_REASON:
return v
raise ValueError(f"unsupported k8s.container.status.reason [{v}]")
def metricbeat_error_message(message):
if "error doing HTTP request to fetch" in message and "Metricset data" in message:
return "Error fetching metrics <snip>"
if "decoding of metric family failed" in message:
return "Error fetching metrics <snip>"
raise ValueError(f"unsupported error.message [{message}]")
strategies = {
"@timestamp": passthrough,
"agent.ephemeral_id": uids(),
"agent.hostname": numbered("gke-apps-host-name-"),
"agent.id": uids(),
"agent.type": passthrough,
"agent.version": passthrough,
"container.id": container_id,
"container.runtime": container_runtime,
"ecs.version": passthrough,
"error.message": metricbeat_error_message,
"event.dataset": passthrough,
"event.duration": passthrough,
"event.module": passthrough,
"fields.cluster": passthrough,
"host.name": numbered("gke-apps-host-name-"),
"kubernetes.container.cpu.limit.cores": passthrough,
"kubernetes.container.cpu.request.cores": passthrough,
"kubernetes.container.cpu.usage.core.ns": passthrough,
"kubernetes.container.cpu.usage.limit.pct": passthrough,
"kubernetes.container.cpu.usage.nanocores": passthrough,
"kubernetes.container.cpu.usage.node.pct": passthrough,
"kubernetes.container.id": k8s_container_id,
"kubernetes.container.image": k8s_container_image,
"kubernetes.container.logs.available.bytes": passthrough,
"kubernetes.container.logs.capacity.bytes": passthrough,
"kubernetes.container.logs.inodes.count": passthrough,
"kubernetes.container.logs.inodes.free": passthrough,
"kubernetes.container.logs.inodes.used": passthrough,
"kubernetes.container.logs.used.bytes": passthrough,
"kubernetes.container.memory.available.bytes": passthrough,
"kubernetes.container.memory.majorpagefaults": passthrough,
"kubernetes.container.memory.limit.bytes": passthrough,
"kubernetes.container.memory.pagefaults": passthrough,
"kubernetes.container.memory.request.bytes": passthrough,
"kubernetes.container.memory.rss.bytes": passthrough,
"kubernetes.container.memory.usage.bytes": passthrough,
"kubernetes.container.memory.usage.limit.pct": passthrough,
"kubernetes.container.memory.usage.node.pct": passthrough,
"kubernetes.container.memory.workingset.bytes": passthrough,
"kubernetes.container.name": numbered("container-name-"),
"kubernetes.container.rootfs.available.bytes": passthrough,
"kubernetes.container.rootfs.capacity.bytes": passthrough,
"kubernetes.container.rootfs.inodes.used": passthrough,
"kubernetes.container.rootfs.used.bytes": passthrough,
"kubernetes.container.start_time": passthrough,
"kubernetes.container.status.phase": passthrough,
"kubernetes.container.status.ready": passthrough,
"kubernetes.container.status.reason": k8s_container_status_reason,
"kubernetes.container.status.restarts": passthrough,
"kubernetes.event.count": passthrough,
"kubernetes.event.involved_object.api_version": passthrough,
"kubernetes.event.involved_object.kind": passthrough,
"kubernetes.event.involved_object.name": numbered("involved-object-name-"),
"kubernetes.event.involved_object.resource_version": passthrough,
"kubernetes.event.involved_object.uid": uids(),
"kubernetes.event.message": k8s_message,
"kubernetes.event.metadata.generate_name": k8s_event_generate_name,
"kubernetes.event.metadata.name": numbered("event-metadata-name-"),
"kubernetes.event.metadata.namespace": numbered("event-metadata-namespace-"),
"kubernetes.event.metadata.resource_version": passthrough,
"kubernetes.event.metadata.self_link": numbered("event-metadata-self-link-"),
"kubernetes.event.metadata.timestamp.created": passthrough,
"kubernetes.event.metadata.uid": uids(),
"kubernetes.event.reason": k8s_event_reason,
"kubernetes.event.timestamp.first_occurrence": passthrough,
"kubernetes.event.timestamp.last_occurrence": passthrough,
"kubernetes.event.type": k8s_event_type,
"kubernetes.labels.app": numbered("label-app-"),
"kubernetes.labels.app_kubernetes_io/component": numbered("app_kubernetes_io/component-"),
"kubernetes.labels.app_kubernetes_io/instance": numbered("app_kubernetes_io/instance-"),
"kubernetes.labels.app_kubernetes_io/managed-by": k8s_labels_app_managed_by,
"kubernetes.labels.app_kubernetes_io/name": numbered("app_kubernetes_io/name-"),
"kubernetes.labels.beta_kubernetes_io/arch": k8s_labels_k8s_arch,
"kubernetes.labels.beta_kubernetes_io/fluentd-ds-ready": "drop",
"kubernetes.labels.beta_kubernetes_io/instance-type": numbered("beta_kubernetes_io/instance-type-"),
"kubernetes.labels.beta_kubernetes_io/os": k8s_labels_k8s_os,
"kubernetes.labels.chart": numbered("chart-"),
"kubernetes.labels.cloud_google_com/gke-nodepool": "drop",
"kubernetes.labels.cloud_google_com/gke-os-distribution": "drop",
"kubernetes.labels.co_elastic_apps_oblt-test-plans_service": "drop",
"kubernetes.labels.failure-domain_beta_kubernetes_io/region": "drop",
"kubernetes.labels.failure-domain_beta_kubernetes_io/zone": "drop",
"kubernetes.labels.component": numbered("label-component-"),
"kubernetes.labels.controller-revision-hash": numbered("label-controller-revision-hash-"),
"kubernetes.labels.controller-uid": uids(),
"kubernetes.labels.github_account": numbered("github-account-"),
"kubernetes.labels.helm_sh/chart": numbered("chart-"),
"kubernetes.labels.heritage": k8s_labels_heritage,
"kubernetes.labels.io_kompose_service": numbered("io_kopose_service-"),
"kubernetes.labels.job-name": numbered("job-name-"),
"kubernetes.labels.kubernetes_io/arch": k8s_labels_k8s_arch,
"kubernetes.labels.kubernetes_io/hostname": numbered("kubernetes_io/hostname-"),
"kubernetes.labels.kubernetes_io/os": k8s_labels_k8s_os,
"kubernetes.labels.k8s-app": k8s_labels_k8s_app,
"kubernetes.labels.logtype": passthrough,
"kubernetes.labels.llama": "drop",
"kubernetes.labels.name": k8s_labels_name,
"kubernetes.labels.pod-template-generation": passthrough,
"kubernetes.labels.pod-template-hash": numbered("label-pod-template-hash-"),
"kubernetes.labels.release": numbered("release-"),
"kubernetes.labels.statefulset_kubernetes_io/pod-name": numbered("statefulset_kubernetes_io/pod-name-"),
"kubernetes.labels.tier": numbered("tier-"),
"kubernetes.labels.watcher": "drop",
"kubernetes.labels.version": "drop",
"kubernetes.namespace": numbered("namespace"),
"kubernetes.node.name": numbered("gke-apps-node-name-"),
"kubernetes.node.cpu.allocatable.cores": passthrough,
"kubernetes.node.cpu.capacity.cores": passthrough,
"kubernetes.node.cpu.usage.core.ns": passthrough,
"kubernetes.node.cpu.usage.nanocores": passthrough,
"kubernetes.node.fs.available.bytes": passthrough,
"kubernetes.node.fs.capacity.bytes": passthrough,
"kubernetes.node.fs.inodes.count": passthrough,
"kubernetes.node.fs.inodes.free": passthrough,
"kubernetes.node.fs.inodes.used": passthrough,
"kubernetes.node.fs.used.bytes": passthrough,
"kubernetes.node.memory.allocatable.bytes": passthrough,
"kubernetes.node.memory.available.bytes": passthrough,
"kubernetes.node.memory.capacity.bytes": passthrough,
"kubernetes.node.memory.majorpagefaults": passthrough,
"kubernetes.node.memory.pagefaults": passthrough,
"kubernetes.node.memory.rss.bytes": passthrough,
"kubernetes.node.memory.usage.bytes": passthrough,
"kubernetes.node.memory.workingset.bytes": passthrough,
"kubernetes.node.network.rx.bytes": passthrough,
"kubernetes.node.network.rx.errors": passthrough,
"kubernetes.node.network.tx.bytes": passthrough,
"kubernetes.node.network.tx.errors": passthrough,
"kubernetes.node.pod.allocatable.total": passthrough,
"kubernetes.node.pod.capacity.total": passthrough,
"kubernetes.node.runtime.imagefs.available.bytes": passthrough,
"kubernetes.node.runtime.imagefs.capacity.bytes": passthrough,
"kubernetes.node.runtime.imagefs.used.bytes": passthrough,
"kubernetes.node.start_time": passthrough,
"kubernetes.node.status.ready": passthrough,
"kubernetes.node.status.unschedulable": passthrough,
"kubernetes.pod.cpu.usage.limit.pct": passthrough,
"kubernetes.pod.cpu.usage.nanocores": passthrough,
"kubernetes.pod.cpu.usage.node.pct": passthrough,
"kubernetes.pod.host_ip": ips(),
"kubernetes.pod.ip": ips(),
"kubernetes.pod.memory.available.bytes": passthrough,
"kubernetes.pod.memory.major_page_faults": passthrough,
"kubernetes.pod.memory.page_faults": passthrough,
"kubernetes.pod.memory.rss.bytes": passthrough,
"kubernetes.pod.memory.usage.bytes": passthrough,
"kubernetes.pod.memory.usage.limit.pct": passthrough,
"kubernetes.pod.memory.usage.node.pct": passthrough,
"kubernetes.pod.memory.working_set.bytes": passthrough,
"kubernetes.pod.network.rx.bytes": passthrough,
"kubernetes.pod.network.rx.errors": passthrough,
"kubernetes.pod.network.tx.bytes": passthrough,
"kubernetes.pod.network.tx.errors": passthrough,
"kubernetes.pod.name": numbered("pod-name-pod-name-"),
"kubernetes.pod.start_time": passthrough,
"kubernetes.pod.status.phase": k8s_pod_status_phase,
"kubernetes.pod.status.ready": passthrough,
"kubernetes.pod.status.scheduled": passthrough,
"kubernetes.pod.uid": uids(),
"kubernetes.replicaset.name": numbered("replicaset-name-"),
"kubernetes.statefulset.name": numbered("statefulset-name-"),
"kubernetes.system.container": k8s_system_container,
"kubernetes.system.cpu.usage.core.ns": passthrough,
"kubernetes.system.cpu.usage.nanocores": passthrough,
"kubernetes.system.memory.majorpagefaults": passthrough,
"kubernetes.system.memory.pagefaults": passthrough,
"kubernetes.system.memory.rss.bytes": passthrough,
"kubernetes.system.memory.usage.bytes": passthrough,
"kubernetes.system.memory.workingset.bytes": passthrough,
"kubernetes.system.start_time": passthrough,
"kubernetes.volume.fs.capacity.bytes": passthrough,
"kubernetes.volume.fs.available.bytes": passthrough,
"kubernetes.volume.fs.inodes.count": passthrough,
"kubernetes.volume.fs.inodes.free": passthrough,
"kubernetes.volume.fs.inodes.used": passthrough,
"kubernetes.volume.fs.used.bytes": passthrough,
"kubernetes.volume.name": numbered("volume-"),
"metricset.name": passthrough,
"metricset.period": passthrough,
"service.address": numbered("service-address-"),
"service.type": service_type,
}
def anon(path, it):
result = {}
for k, v in it.items():
new_path = k if path == "" else path + "." + k
if isinstance(v, dict):
result[k] = anon(new_path, v)
continue
strategy = strategies.get(new_path)
if not strategy:
raise KeyError(f"Unknown key [{new_path}] with value [{v}]")
if strategy == "drop":
continue
if isinstance(it, list):
result[k] = [strategy(item) for item in v]
continue
result[k] = strategy(v)
return result
count = 0
for line in sys.stdin:
try:
parsed = json.loads(line)
anonymized = anon("", parsed)
print(json.dumps(anonymized, separators=(",", ":")))
count += 1
if count % 1000 == 0:
print(f"Processed {count:012d} documents", file=sys.stderr)
except Exception as e:
raise Exception(f"Error processing {line}") from e