in elastic/shared/track_processors/data_generator.py [0:0]
def _json_processor(self, doc_bytes, _, corpus_name):
# add any additional doc work here
doc = json.loads(doc_bytes.decode("utf-8"))
message_size = int(doc.pop("msglen", 0))
remove_fields = self._exclude_properties.get(corpus_name, [])
if len(remove_fields) > 0:
for field in remove_fields:
if field in doc:
del doc[field]
# doc size after field removal! careful with whitespace
doc_size = len(json.dumps(doc, separators=(",", ":")).encode("utf-8"))
doc["rally"] = {}
# Note: we rely on insertion order here for later efficient parsing - as of python 3.7 this is preserved
if message_size == 0:
# message_size is used for the throughput calculations
# if msglen is included in corpus data then calculated message_size is based on raw data and traffic
# throughput will be calculated on raw message size. Otherwise traffic throughput will be assumed
# calculated from document size.
doc["rally"]["message_size"] = doc_size
else:
doc["rally"]["message_size"] = message_size
doc["rally"]["doc_size"] = doc_size
# the following is not used for enriched data stored on disk, only for _sample_corpus_stats()
# size with the rally meta, assign the doc size and timestamp temporarily so its included
# Note: the doc size is only an estimate. It will often be modified by ingest pipelines so in subject to change.
if self.include_doc_size_with_metadata:
doc["rally"]["doc_size_with_meta"] = doc_size
doc["rally"]["doc_size_with_meta"] = len(json.dumps(doc, separators=(",", ":")).encode("utf-8"))
self._append_doc_markers(doc)
return doc, doc["rally"]["message_size"]