tsdb/_tools/dedupe.py (82 lines of code) (raw):

#!/usr/bin/env python3 #################################################################### # # A tool that dedupes a sorted anonymized metricbeat dump. # #################################################################### # # Expects sorted anonymized metricbeat dump as input via standard # in and returns a deduped sorted anonymized metric beat output via # standard out. Also seperately generates 'dupes-' prefixed files # per metric set name containing the dupes for manual inspection. # #################################################################### import json import sys def generate_event_key(parsed_line): return parsed_line["kubernetes"]["event"]["involved_object"]["uid"] def generate_state_container_key(parsed_line): key = parsed_line["kubernetes"]["container"]["name"] key += parsed_line["kubernetes"]["pod"]["name"] key += parsed_line["kubernetes"]["node"]["name"] container_id = parsed_line.get("kubernetes", {}).get("container", {}).get("id") if container_id is not None: key += container_id return key def generate_state_pod_key(parsed_line): return parsed_line["kubernetes"]["pod"]["name"] + generate_node_key(parsed_line) def generate_container_key(parsed_line): return parsed_line["kubernetes"]["container"]["name"] + parsed_line["kubernetes"]["pod"]["name"] + generate_node_key(parsed_line) def generate_volume_key(parsed_line): return parsed_line["kubernetes"]["volume"]["name"] + parsed_line["kubernetes"]["pod"]["name"] + generate_node_key(parsed_line) def generate_pod_key(parsed_line): return parsed_line["kubernetes"]["pod"]["name"] + generate_node_key(parsed_line) def generate_node_key(parsed_line): return parsed_line["kubernetes"]["node"]["name"] def generate_system_key(parsed_line): return generate_node_key(parsed_line) + parsed_line["kubernetes"]["system"]["container"] def generate_state_node_key(parsed_line): return generate_node_key(parsed_line) generate_key_functions = { "event": generate_event_key, "state_container": generate_state_container_key, "state_pod": generate_state_pod_key, "container": generate_container_key, "volume": generate_volume_key, "pod": generate_pod_key, "node": generate_node_key, "system": generate_system_key, "state_node": generate_state_node_key, } in_count = 0 error_count = 0 out_count = 0 current_timestamp = None keys = set() dupe_files = {} with open("error_lines.json", "a") as error_file: for line in sys.stdin: in_count += 1 try: parsed = json.loads(line) line_timestamp = parsed["@timestamp"] metric_set_name = parsed["metricset"]["name"] if parsed.get("error") is not None: error_count += 1 print(line, file=error_file) continue generate_key_function = generate_key_functions[metric_set_name] key = metric_set_name + generate_key_function(parsed) if current_timestamp == line_timestamp: if key in keys: dupe_file_name = f"dupes-{metric_set_name}.json" dupe_file = dupe_files.get(dupe_file_name) if dupe_file is None: dupe_file = open(dupe_file_name, "a") dupe_files[dupe_file_name] = dupe_file print(line, file=dupe_file) continue else: keys.add(key) else: current_timestamp = line_timestamp keys = set() keys.add(key) print(line, end="") out_count += 1 if out_count % 100000 == 0: print( f"in {in_count:012d} docs, out {out_count:012d} docs, errors {error_count:012d}", file=sys.stderr, ) except Exception as e: raise Exception(f"Error processing {line}") from e for dupe_file in dupe_files: dupe_files[dupe_file].close()