in tools/perf-scale-workload/continuous_ingester.py [0:0]
def run(self):
global seriesId
global timestamp
global lock
idx = 0
mean = 0.0
squared = 0.0
addReqId = self.args.addReqId
addReqIdAsDim = addReqId and self.args.addReqIdAsDim
addReqIdAsMeasure = addReqId and not self.args.addReqIdAsDim
writeRecordsBatch = list()
recordsToWrite = list()
while True:
with lock:
if self.sigInt == True or sigInt == True or self.event.is_set():
print("Thread {} exiting.".format(self.threadId))
break
seriesId += 1
if seriesId >= self.numMetrics + self.numEvents:
## Wrapping around, so move to new timestamp.
seriesId = 0
newTimestamp = timestamp + self.args.intervalMillis
currentTime = getCurrentTimestampMillis()
## Check if the timestamps are falling behind
if newTimestamp < currentTime - 0.05 * self.args.intervalMillis:
print("Can't keep up ingestion to the desired inter-event interval. Expected interval: {} ms. Actual: {} ms. Consider increasing concurrency or processes.".format(self.args.intervalMillis, currentTime - timestamp))
## Move time forward.
timestamp = getTimestampMillis()
else:
timestamp = newTimestamp
## Check if we are ingesting too fast, then slow down.
if timestamp > currentTime - 1000:
## Slow down
sleepTimeSecs = int((timestamp - currentTime)/1000)
print("Thread {} sleeping for {} secs".format(self.threadId, sleepTimeSecs))
time.sleep(sleepTimeSecs)
now = datetime.datetime.now()
print("Resetting to first series from thread: [{}] at time {}. Timestamp set to: {}.".format(self.threadId, now.strftime("%Y-%m-%d %H:%M:%S"), timestamp))
localSeriesId = seriesId
localTimestamp = timestamp
if localSeriesId < self.numMetrics:
dimensions = model.createDimensionsEntry(self.dimensionMetrics[localSeriesId], addReqId=addReqIdAsDim)
records = model.createRandomMetrics(localSeriesId, dimensions, localTimestamp, "MILLISECONDS", self.highUtilizationHosts, self.lowUtilizationHosts, wide=self.args.wide, addReqId=addReqIdAsMeasure)
else:
dimensions = model.createDimensionsEntry(self.dimensionEvents[localSeriesId - self.numMetrics], addReqId=addReqIdAsDim)
records = model.createRandomEvent(dimensions, localTimestamp, "MILLISECONDS", wide=self.args.wide, addReqId=addReqIdAsMeasure)
if self.args.batchWrites:
if len(writeRecordsBatch) + len(records) <= self.args.batchSize:
writeRecordsBatch.extend(records)
## Generate more data, unless we're wrapping around, at which point, drain any pending records.
if localSeriesId < self.numMetrics + self.numEvents:
continue
else:
## transfer a subset of values from the records produced into the batch
spaceRemaining = self.args.batchSize - len(writeRecordsBatch)
assert(spaceRemaining < len(records))
## Transfer 0 - spaceRemaining - 1 to be written with this batch, and spaceRemaining - end in the next batch
## If spaceRemaining is 0, then just write what we have accumulated so far
if spaceRemaining > 0:
writeRecordsBatch.extend(records[0:spaceRemaining])
## The batch is full, now we issue the write record request.
recordsToWrite.clear()
recordsToWrite.extend(writeRecordsBatch)
writeRecordsBatch.clear()
writeRecordsBatch.extend(records[spaceRemaining:])
else:
recordsToWrite.clear()
recordsToWrite.extend(records)
idx += 1
start = timer()
try:
writeResult = tswrite.writeRecords(self.client, self.databaseName, self.tableName, recordsToWrite)
self.recordsWritten += len(recordsToWrite)
self.success += 1
except Exception as e:
print(e)
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_traceback, limit=2, file=sys.stdout)
requestId = "RequestId: {}".format(e.response['ResponseMetadata']['RequestId'])
print(requestId)
print(json.dumps(dimensions, indent=2))
print(json.dumps(records, indent=2))
continue
finally:
self.count += 1
end = timer()
cur = end - start
self.digest.update(cur)
self.sum += cur
## Computing the streaming M^2 (squared distance from mean)
delta = cur - mean
mean += delta / self.count
squared += delta * (cur - mean)
if self.count > 1:
self.variance = float(squared / (self.count - 1))
requestId = writeResult['ResponseMetadata']['RequestId']
if idx % 1000 == 0:
now = datetime.datetime.now()
print("{}. {}. {}. Last RequestId: {}. Avg={:,}, Stddev={:,}, 50thPerc={:,}, 90thPerc={:,}, 99thPerc={:,}. Records written: {}".format(
self.threadId, idx, now.strftime("%Y-%m-%d %H:%M:%S"), requestId, round(self.sum / self.count, 3),
round(math.sqrt(self.variance), 3), round(self.digest.percentile(50), 3),
round(self.digest.percentile(90), 3), round(self.digest.percentile(99), 3), self.recordsWritten))