mysql-test/t/sql_stats_snapshot_stress.py (120 lines of code) (raw):

import datetime import time import sys import MySQLdb from MySQLdb.constants import * import argparse import random import threading import traceback NUM_WORKERS = 10 def parse_args(): parser = argparse.ArgumentParser( 'sql_stats_snapshot_stress', description='Generate load populating sql stats and using snapshot', ) parser.add_argument( '--host', type=str, help='host name') parser.add_argument( '--port', type=int, help='port number') parser.add_argument( '--user', type=str, help='user name') parser.add_argument( '--password', default='', type=str, help='password') parser.add_argument( '--database', type=str, help='database to use') return parser.parse_args() def generate_load(worker): args = worker.args worker_id = worker.worker_id con = MySQLdb.connect(user=args.user, passwd=args.password, host=args.host, port=args.port, db=args.database) i = 0 if worker_id >= 2: op = 1 else: op = 2 while not worker.stop_flag: # op = random.randint(1, 10) start_time = datetime.datetime.now() cur = con.cursor() try: if op == 1: op_str = "insert" t = random.randint(1, 1000) table = "tt" + str(t) cur.execute("create temporary table %s (a int)" % table) stmt = "select * from " + table for n in range(0, 100): stmt = stmt + " union select " + str(n) cur.execute(stmt) cur.execute("drop table " + table) else: time.sleep(random.randint(1, 5)) op_str = "snapshot" print("WORKER %d: Starting %s %d ..." % (worker_id, op_str, i)) cur = con.cursor() cur.execute( \ "select s.execution_count, " \ " left(t.sql_text, 50) text " \ "from information_schema.sql_text t, " \ " information_schema.sql_statistics s, " \ " information_schema.client_attributes a " \ "where a.client_id=s.client_id and s.sql_id=t.sql_id " \ "order by 1 desc, 2") except (MySQLdb.OperationalError, MySQLdb.InternalError) as e: if not is_deadlock_error(e): raise e cur.close() delta = datetime.datetime.now() - start_time print("WORKER %d: Executed %s %d in %d ms" % (worker_id, op_str, i, \ delta.total_seconds() * 1000)) i += 1 con.close() class worker_thread(threading.Thread): def __init__(self, args, worker_id): threading.Thread.__init__(self) self.args = args self.exception = None self.worker_id = worker_id self.stop_flag = False self.start() def run(self): try: generate_load(self) except Exception as e: self.exception = traceback.format_exc() def stop(self): self.stop_flag = True def main(): args = parse_args() workers = [] for i in range(NUM_WORKERS): worker = worker_thread(args, i) workers.append(worker) time.sleep(60) for w in workers: w.stop() worker_failed = False for w in workers: w.join() if w.exception: print("worker hit an exception:\n%s\n'" % w.exception) worker_failed = True if worker_failed: sys.exit(1) if __name__ == '__main__': main()