edas-demo/script/scrape-data-from-prometheus/scrape-data-from-prometheus.py (118 lines of code) (raw):

import heapq import json import time from prometheus_api_client import PrometheusConnect import datetime PROMQL_CLUSTER_CPU_USAGE = """(1 - avg by(instance)(rate(node_cpu_seconds_total{mode="idle"}[1m]))) * 100""" PROMQL_CLUSTER_MEM_USAGE = """(1 - avg by(instance)(node_memory_MemAvailable_bytes) / avg by(instance)(node_memory_MemTotal_bytes)) * 100""" PROMQL_CLUSTER_DISK_READ = """sum(rate(node_disk_read_bytes_total[5m])) by (instance)""" PROMQL_CLUSTER_DISK_WRITE = """sum(rate(node_disk_written_bytes_total[5m])) by (instance)""" PROMQL_CLUSTER_LOAD15 = """node_load15""" PROMQL_POD_RESTART = """kube_pod_container_status_restarts_total""" PROMQL_POD_CPU_USAGE = """sum(rate(container_cpu_usage_seconds_total{container_name!="POD"}[1m])) by (pod_name, namespace) / sum(kube_pod_container_resource_limits_cpu_cores) by (pod_name, namespace) * 100""" PROMQL_POD_MEM_USAGE = """sum(container_memory_working_set_bytes{container_name!="POD"}) by (pod_name, namespace) / sum(kube_pod_container_resource_limits_memory_bytes) by (pod_name, namespace) * 100""" # 填写Prometheus URL PROM_URL = "" # 填写Prometheus token PROM_AUTH_TOKEN = "" # 以当前时间为END_TIME END_TIME = int(time.time()) # 抓去7天范围的数据 START_TIME = END_TIME - 7 * 24 * 3600 # 数据点间隔,单位秒 PROM_DATA_INTERNAL_STEP = 300 # POD相关信息,输出TOP N POD_INFO_TOP_N = 9999999 def get_end_datetime() -> datetime.datetime: return datetime.datetime.fromtimestamp(END_TIME) def get_start_datetime() -> datetime.datetime: return datetime.datetime.fromtimestamp(START_TIME) def query_range_prometheus(url, query, start_time, end_time, step, token): headers = {'Authorization': token} pc = PrometheusConnect(url, headers=headers) return pc.custom_query_range(query, start_time, end_time, step) def query_prometheus(url: str, token: str, prom_ql: str) -> list: headers = {'Authorization': token} pc = PrometheusConnect(url, headers=headers) result = pc.custom_query(prom_ql) return result def scrape_cluster_data(prom_url: str, prom_token: str) -> dict: data = {} data["load15"] = query_and_process_data(prom_url, prom_token, PROMQL_CLUSTER_LOAD15) data["节点CPU使用率"] = query_and_process_data(prom_url, prom_token, PROMQL_CLUSTER_CPU_USAGE) data["节点内存使用率"] = query_and_process_data(prom_url, prom_token, PROMQL_CLUSTER_MEM_USAGE) data["节点磁盘读取(Bytes)"] = query_and_process_data(prom_url, prom_token, PROMQL_CLUSTER_DISK_READ) data["节点磁盘写入(Bytes)"] = query_and_process_data(prom_url, prom_token, PROMQL_CLUSTER_DISK_WRITE) worker_data = {} for type, values in data.items(): for instance, v_tuple in values.items(): d = worker_data.get(instance, {}) min, max, avg = v_tuple d[f"{type}-最小值"] = min d[f"{type}-最大值"] = max d[f"{type}-平均值"] = avg worker_data[instance] = d return worker_data def query_and_process_data(prom_url: str, prom_token: str, prom_ql: str) -> dict: data = query_range_prometheus(prom_url, prom_ql, get_start_datetime(), get_end_datetime(), PROM_DATA_INTERNAL_STEP, prom_token) _, statistic_data = __extract_prom_data(data, "instance") return statistic_data def __extract_prom_data(data: list, legend_key: str) -> (dict, dict): draw_data = {} statistic_data = {} for d in data: metric: dict = d["metric"] key = metric[legend_key] values: list = d["values"] timestamps = [datetime.datetime.fromtimestamp(int(v[0])) for v in values] values = [float(v[1]) for v in values] draw_data[key] = (timestamps, values) statistic_data[key] = get_min_max_avg(values) return draw_data, statistic_data def get_min_max_avg(data: list) -> (float, float, float): min = data[0] max = data[0] sum = 0 for d in data: sum += d if d > max: max = d if d < min: min = d avg = sum / len(data) return min, max, avg def scrape_pod_data(prom_url:str, prom_token) -> any: pod_data = {} pod_data["pod-restart"] = _query_pod_restart(prom_url, prom_token) pod_cpu_data = _query_pod_cpu(prom_url, prom_token) pod_mem_data = _query_pod_mem(prom_url, prom_token) pod_data["Pod CPU使用率最大值"] = get_top_n_element(POD_INFO_TOP_N, 1, pod_cpu_data) pod_data["Pod CPU使用率平均值"] = get_top_n_element(POD_INFO_TOP_N, 2, pod_cpu_data) pod_data["Pod内存使用率最大值"] = get_top_n_element(POD_INFO_TOP_N, 1, pod_mem_data) pod_data["Pod内存使用率平均值"] = get_top_n_element(POD_INFO_TOP_N, 2, pod_mem_data) return pod_data def _query_pod_restart(prom_url:str, prom_token: str) -> list: data = query_prometheus(prom_url, prom_token, PROMQL_POD_RESTART) result = [] for d in data: pod = d["metric"]["pod"] v = int(str(d["value"][1])) if v == 0: continue result.append((pod, v)) return result def _query_pod_cpu(prom_url:str, prom_token: str) -> dict: data = query_range_prometheus(prom_url, PROMQL_POD_CPU_USAGE, get_start_datetime(), get_end_datetime(), PROM_DATA_INTERNAL_STEP, prom_token) _, statistic_data = __extract_prom_data(data, "pod_name") return statistic_data def _query_pod_mem(prom_url:str, prom_token: str) -> dict: data = query_range_prometheus(prom_url, PROMQL_POD_MEM_USAGE, get_start_datetime(), get_end_datetime(), PROM_DATA_INTERNAL_STEP, prom_token) _, statistic_data = __extract_prom_data(data, "pod_name") return statistic_data def get_top_n_element(n: int, value_idx:int, data: dict)->list: l = [(k, v) for k, v in data.items()] top_n = heapq.nlargest(n, l, key=lambda x: x[1][value_idx]) return [(v[0], v[1][value_idx]) for v in top_n] if __name__ == '__main__': if PROM_URL == "" or PROM_AUTH_TOKEN == "": raise RuntimeError("请填写Prometheus访问信息") cluster_data = scrape_cluster_data(PROM_URL, PROM_AUTH_TOKEN) with open("cluster-data.json", "w") as f: f.write(json.dumps(cluster_data, ensure_ascii=False)) pod_data = scrape_pod_data(PROM_URL, PROM_AUTH_TOKEN) with open("pod-data.json", "w") as f: f.write(json.dumps(pod_data, ensure_ascii=False))