in tools/perf-scale-workload/query_executer.py [0:0]
def run(self):
logPrefix = "[{}]".format(self.threadId)
databaseName = self.args.databaseName
if self.tableName == None:
tableName = self.args.tableName
else:
tableName = self.tableName
repetitions = int(self.config.get(configDefaultSection, configRepetitions, fallback = 100))
logDir = self.args.logDir
runPrefix = self.args.runPrefix
if not os.path.exists(logDir):
os.makedirs(logDir)
## Create an experiment name for logging purposes.
expName = "{}-{}-{}".format(runPrefix, self.startTime.strftime("%Y-%m-%d-%H-%M-%S"), self.threadId)
expDirName = os.path.join(logDir, expName)
if not os.path.exists(expDirName):
os.makedirs(expDirName)
print("Starting experiment {} at {}. Database: {}. Table: {}. Log files at: {}".format(
expName, self.startTime, databaseName, tableName, expDirName))
beginExperiment = timer()
## Start running the experiment
self.logStats(expDirName, logPrefix, databaseName, tableName, "begin")
queryLogFiles = dict()
queryExecutionStats = dict()
try:
## Generate the query strings and initalize other resources
for query in self.queriesSelected:
outFilePath = os.path.join(expDirName, "{0}.log".format(query))
errFilePath = os.path.join(expDirName, "{0}.err".format(query))
sqlFilePath = os.path.join(expDirName, "{0}.sql".format(query))
outFile = open(outFilePath, "w")
errFile = open(errFilePath, "w")
queryLogFiles[query] = (outFile, errFile)
queryExecutionStats[query] = QueryStats()
with open(sqlFilePath, "w") as file:
file.write(self.queries[query])
output = list()
output.append('Query type, Total Count, Successful Count, Avg. latency (in secs), Std dev latency (in secs), Median, 90th perc (in secs), 99th Perc (in secs), Geo Mean (in secs)')
queryCount = 0
while queryCount < repetitions:
try:
queryCount += 1
## Randomly choose a query to execute
queryToExecute = np.random.choice(self.queriesSelected, p=self.queryWeights)
queryStr = self.queries[queryToExecute]
print("{} {}. {}".format(logPrefix, queryCount, queryToExecute))
result = executeQueryInstance(self.client, queryStr, queryCount, logPrefix=logPrefix,
thinkTimeMillis=self.args.thinkTimeMillis, randomizedThink = self.args.randomizedThink,
outFile=queryLogFiles[queryToExecute][0], errFile=queryLogFiles[queryToExecute][1])
queryStat = queryExecutionStats[queryToExecute]
queryStat.count += 1
queryStat.success += result.success
if result.success == 1:
queryStat.timings.append(result.timing)
except:
print("Error executing query: ", query)
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_traceback, limit=2, file=sys.stdout)
print('\nSummary Results\n')
for query in self.queriesSelected:
queryStat = queryExecutionStats[query]
if queryStat.success > 0:
output.append('{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}'.format(query, queryStat.count, queryStat.success,
round(np.average(queryStat.timings), 3), round(np.std(queryStat.timings), 3),
round(np.percentile(queryStat.timings, 50), 3), round(np.percentile(queryStat.timings, 90), 3),
round(np.percentile(queryStat.timings, 99), 3), round(gmean(queryStat.timings), 3)))
else:
output.append('{0}, {1}, {2}, , , , ,'.format(query, queryStat.count, queryStat.success))
print(os.linesep.join("{}".format(x) for x in output))
summaryFile = os.path.join(expDirName, "{}-summary.csv".format(expName))
with open(summaryFile, "w") as summary:
summary.write(os.linesep.join("{}".format(x) for x in output))
## Get a count at the end of the experiment.
self.logStats(expDirName, logPrefix, databaseName, tableName, "end")
endExperiment = timer()
print("Experiment {} completed. Time (seconds): {}. Log directory: {}".format(expName,
round(endExperiment - beginExperiment, 2), expDirName))
self.queryCount = queryCount
self.output = output
finally:
for key in queryLogFiles:
val = queryLogFiles[key]
val[0].close()
val[1].close()