in elastic/shared/track_processors/data_generator.py [0:0]
def _sample_corpus_stats(self):
corpus_stats = {}
self.logger.info("Sampling corpora...")
for integration_name, integration in self._integration_ratios.items():
for corpus_name in integration["corpora"].keys():
self.logger.info(
"Sampling [%s] docs from corpus [%s]",
self._sample_size,
corpus_name,
)
corpus_reader = self.readers[corpus_name]
with corpus_reader:
sampled_docs = 0
total_message_size = 0
total_doc_size = 0
total_doc_size_with_meta = 0
while sampled_docs < self._sample_size:
num_docs, docs, message_size = next(corpus_reader)
sampled_docs += num_docs
line_num = 0
total_message_size += message_size
for doc in docs:
if line_num % 2 == 1:
total_doc_size += doc["rally"]["doc_size"]
total_doc_size_with_meta += doc["rally"]["doc_size_with_meta"]
line_num += 1
corpus_stats[corpus_name] = {
"sampled_docs": sampled_docs,
"avg_message_size": total_message_size / sampled_docs,
"avg_doc_size": total_doc_size / sampled_docs,
"avg_doc_size_with_meta": total_doc_size_with_meta / sampled_docs,
"raw_json_ratio": total_doc_size / total_message_size,
}
self.logger.info(
"Stats for corpora [%s]: [%s]",
corpus_name,
json.dumps(corpus_stats[corpus_name]),
)
return corpus_stats