tools/load-generator/main.py (105 lines of code) (raw):

#!/usr/bin/env python import os import time import sys import requests import yaml import threading from datetime import timedelta from prometheus_client import start_http_server, Histogram, Counter namespace = "" max_404_errors = 30 domain_name = os.environ["DOMAIN_NAME"] class Querier(object): """ Querier launches groups of queries against a Prometheus service. """ query_duration = Histogram("loadgen_query_duration_seconds", "Query duration", ["prometheus", "group", "expr", "type"], buckets=(0.05, 0.1, 0.3, 0.7, 1.5, 2.5, 4, 6, 8, 10, 13, 16, 20, 24, 29, 36, 42, 50, 60)) query_count = Counter('loadgen_queries_total', 'Total amount of queries', ["prometheus", "group", "expr", "type"], ) query_fail_count = Counter('loadgen_failed_queries_total', 'Amount of failed queries', ["prometheus", "group", "expr", "type"], ) def __init__(self, groupID, target, pr_number, qg): self.target = target self.name = qg["name"] self.groupID = groupID self.numberOfErrors = 0 self.interval = duration_seconds(qg["interval"]) self.queries = qg["queries"] self.type = qg.get("type", "instant") self.start = duration_seconds(qg.get("start", "0h")) self.end = duration_seconds(qg.get("end", "0h")) self.step = qg.get("step", "15s") if self.type == "instant": self.url = "http://%s/%s/prometheus-%s/api/v1/query" % (domain_name, pr_number, target) else: self.url = "http://%s/%s/prometheus-%s/api/v1/query_range" % (domain_name, pr_number, target) def run(self): print("run querier %s %s for %s" % (self.target, self.name, self.url)) print("Waiting for 20 seconds to allow prometheus server (%s) to be properly set-up" % (self.url)) time.sleep(20) while True: start = time.time() for q in self.queries: self.query(q["expr"]) wait = self.interval - (time.time() - start) time.sleep(max(wait, 0)) def query(self, expr): try: Querier.query_count.labels(self.target, self.name, expr, self.type).inc() start = time.time() params = {"query": expr} if self.type == "range": params["start"] = start - self.start params["end"] = start - self.end params["step"] = self.step resp = requests.get(self.url, params) dur = time.time() - start if resp.status_code == 404: print("WARNING :: GroupId#%d : Querier returned 404 for prometheus instance %s." % (self.groupID, self.url)) self.numberOfErrors += 1 if self.numberOfErrors == max_404_errors: print("ERROR :: GroupId#%d : Querier returned 404 for prometheus instance %s %d times." % (self.groupID, self.url, max_404_errors)) os._exit(1) elif resp.status_code != 200: print("WARNING :: GroupId#%d : Querier returned %d for prometheus instance %s." % (self.groupID, resp.status_code, self.url)) else: print("GroupId#%d : query %s %s, status=%s, size=%d, dur=%.3f" % (self.groupID, self.target, expr, resp.status_code, len(resp.text), dur)) Querier.query_duration.labels(self.target, self.name, expr, self.type).observe(dur) except IOError as e: Querier.query_fail_count.labels(self.target, self.name, expr, self.type).inc() print("WARNING :: GroupId#%d : Could not query prometheus instance %s. \n %s" % (self.groupID, self.url, e)) except Exception as e: Querier.query_fail_count.labels(self.target, self.name, expr, self.type).inc() print("WARNING :: GroupId#%d : Could not query prometheus instance %s. \n %s" % (self.groupID, self.url, e)) def duration_seconds(s): num = int(s[:-1]) if s.endswith('s'): return timedelta(seconds=num).total_seconds() elif s.endswith('m'): return timedelta(minutes=num).total_seconds() elif s.endswith('h'): return timedelta(hours=num).total_seconds() raise "unknown duration %s" % s def main(): if len(sys.argv) < 3: print("unexpected arguments") print("usage: <load_generator> <namespace> <pr_number>") exit(2) global namespace namespace = sys.argv[1] pr_number = sys.argv[2] config = yaml.load(open("/etc/loadgen/config.yaml", 'r').read(), Loader=yaml.FullLoader) print("loaded configuration") for i,g in enumerate(config["querier"]["groups"]): p = threading.Thread(target=Querier(i, "pr", pr_number, g).run) p.start() for i,g in enumerate(config["querier"]["groups"]): p = threading.Thread(target=Querier(i, "release", pr_number, g).run) p.start() start_http_server(8080) print("started HTTP server on 8080") while True: time.sleep(100) if __name__ == "__main__": main()