def _json_processor()

in elastic/shared/track_processors/data_generator.py [0:0]


    def _json_processor(self, doc_bytes, _, corpus_name):
        # add any additional doc work here
        doc = json.loads(doc_bytes.decode("utf-8"))
        message_size = int(doc.pop("msglen", 0))
        remove_fields = self._exclude_properties.get(corpus_name, [])
        if len(remove_fields) > 0:
            for field in remove_fields:
                if field in doc:
                    del doc[field]
            # doc size after field removal! careful with whitespace
        doc_size = len(json.dumps(doc, separators=(",", ":")).encode("utf-8"))
        doc["rally"] = {}
        # Note: we rely on insertion order here for later efficient parsing - as of python 3.7 this is preserved
        if message_size == 0:
            # message_size is used for the throughput calculations
            # if msglen is included in corpus data then calculated message_size is based on raw data and traffic
            # throughput will be calculated on raw message size.  Otherwise traffic throughput will be assumed
            # calculated from document size.
            doc["rally"]["message_size"] = doc_size
        else:
            doc["rally"]["message_size"] = message_size
        doc["rally"]["doc_size"] = doc_size
        # the following is not used for enriched data stored on disk, only for _sample_corpus_stats()
        # size with the rally meta, assign the doc size and timestamp temporarily so its included
        # Note: the doc size is only an estimate. It will often be modified by ingest pipelines so in subject to change.
        if self.include_doc_size_with_metadata:
            doc["rally"]["doc_size_with_meta"] = doc_size
            doc["rally"]["doc_size_with_meta"] = len(json.dumps(doc, separators=(",", ":")).encode("utf-8"))

        self._append_doc_markers(doc)
        return doc, doc["rally"]["message_size"]