def run()

in tools/perf-scale-workload/continuous_ingester.py [0:0]


    def run(self):
        global seriesId
        global timestamp
        global lock

        idx = 0
        mean = 0.0
        squared = 0.0
        addReqId = self.args.addReqId
        addReqIdAsDim = addReqId and self.args.addReqIdAsDim
        addReqIdAsMeasure = addReqId and not self.args.addReqIdAsDim

        writeRecordsBatch = list()
        recordsToWrite = list()

        while True:
            with lock:
                if self.sigInt == True or sigInt == True or self.event.is_set():
                    print("Thread {} exiting.".format(self.threadId))
                    break

                seriesId += 1
                if seriesId >= self.numMetrics + self.numEvents:
                    ## Wrapping around, so move to new timestamp.
                    seriesId = 0
                    newTimestamp = timestamp + self.args.intervalMillis
                    currentTime = getCurrentTimestampMillis()
                    ## Check if the timestamps are falling behind
                    if newTimestamp < currentTime - 0.05 * self.args.intervalMillis:
                        print("Can't keep up ingestion to the desired inter-event interval. Expected interval: {} ms. Actual: {} ms. Consider increasing concurrency or processes.".format(self.args.intervalMillis, currentTime - timestamp))
                        ## Move time forward.
                        timestamp = getTimestampMillis()
                    else:
                        timestamp = newTimestamp
                        ## Check if we are ingesting too fast, then slow down.
                        if timestamp > currentTime - 1000:
                            ## Slow down
                            sleepTimeSecs = int((timestamp - currentTime)/1000)
                            print("Thread {} sleeping for {} secs".format(self.threadId, sleepTimeSecs))
                            time.sleep(sleepTimeSecs)

                    now = datetime.datetime.now()
                    print("Resetting to first series from thread: [{}] at time {}. Timestamp set to: {}.".format(self.threadId, now.strftime("%Y-%m-%d %H:%M:%S"), timestamp))

                localSeriesId = seriesId
                localTimestamp = timestamp

            if localSeriesId < self.numMetrics:
                dimensions = model.createDimensionsEntry(self.dimensionMetrics[localSeriesId], addReqId=addReqIdAsDim)
                records = model.createRandomMetrics(localSeriesId, dimensions, localTimestamp, "MILLISECONDS", self.highUtilizationHosts, self.lowUtilizationHosts, wide=self.args.wide,  addReqId=addReqIdAsMeasure)
            else:
                dimensions = model.createDimensionsEntry(self.dimensionEvents[localSeriesId - self.numMetrics], addReqId=addReqIdAsDim)
                records = model.createRandomEvent(dimensions, localTimestamp, "MILLISECONDS", wide=self.args.wide, addReqId=addReqIdAsMeasure)

            if self.args.batchWrites:
                if len(writeRecordsBatch) + len(records) <= self.args.batchSize:
                    writeRecordsBatch.extend(records)
                    ## Generate more data, unless we're wrapping around, at which point, drain any pending records.
                    if localSeriesId < self.numMetrics + self.numEvents:
                        continue
                else:
                    ## transfer a subset of values from the records produced into the batch
                    spaceRemaining = self.args.batchSize - len(writeRecordsBatch)
                    assert(spaceRemaining < len(records))
                    ## Transfer 0 - spaceRemaining - 1 to be written with this batch, and spaceRemaining - end in the next batch
                    ## If spaceRemaining is 0, then just write what we have accumulated so far
                    if spaceRemaining > 0:
                        writeRecordsBatch.extend(records[0:spaceRemaining])
                    ## The batch is full, now we issue the write record request.
                    recordsToWrite.clear()
                    recordsToWrite.extend(writeRecordsBatch)
                    writeRecordsBatch.clear()
                    writeRecordsBatch.extend(records[spaceRemaining:])
            else:
                recordsToWrite.clear()
                recordsToWrite.extend(records)

            idx += 1
            start = timer()
            try:
                writeResult = tswrite.writeRecords(self.client, self.databaseName, self.tableName, recordsToWrite)
                self.recordsWritten += len(recordsToWrite)
                self.success += 1
            except Exception as e:
                print(e)
                exc_type, exc_value, exc_traceback = sys.exc_info()
                traceback.print_exception(exc_type, exc_value, exc_traceback, limit=2, file=sys.stdout)
                requestId = "RequestId: {}".format(e.response['ResponseMetadata']['RequestId'])
                print(requestId)
                print(json.dumps(dimensions, indent=2))
                print(json.dumps(records, indent=2))
                continue
            finally:
                self.count += 1
                end = timer()
                cur = end - start
                self.digest.update(cur)
                self.sum += cur
                ## Computing the streaming M^2 (squared distance from mean)
                delta = cur - mean
                mean += delta / self.count
                squared += delta * (cur - mean)
                if self.count > 1:
                    self.variance = float(squared / (self.count - 1))

            requestId = writeResult['ResponseMetadata']['RequestId']
            if idx % 1000 == 0:
                now = datetime.datetime.now()
                print("{}. {}. {}. Last RequestId: {}. Avg={:,}, Stddev={:,}, 50thPerc={:,}, 90thPerc={:,}, 99thPerc={:,}. Records written: {}".format(
                    self.threadId, idx, now.strftime("%Y-%m-%d %H:%M:%S"), requestId, round(self.sum / self.count, 3),
                    round(math.sqrt(self.variance), 3), round(self.digest.percentile(50), 3),
                    round(self.digest.percentile(90), 3), round(self.digest.percentile(99), 3), self.recordsWritten))