def _json_processor()

in elastic/shared/parameter_sources/processed.py [0:0]


    def _json_processor(self, doc: bytes, line_num: int, _: str) -> Tuple[str, int]:
        doc = doc.decode("utf-8").strip()
        if line_num % 2 == 0:
            return doc, 0
        # adds the timestamp to docs not metadata lines which will be in generated files
        timestamp = self._ts_generator.next_timestamp()
        # we assume date order - maybe speed this up for a boolean check on first request?
        if timestamp < self.min_timestamp:
            self.min_timestamp = timestamp
        self.max_timestamp = timestamp

        # see ProcessedCorpusParamSource for more details
        rallyts_start_pos = int(doc[MagicNumbers.RALLYTS_BEGIN_IDX : MagicNumbers.TS_BEGIN_IDX], 16)
        msglen_value_start_pos = int(doc[MagicNumbers.MSGLEN_BEGIN_IDX : MagicNumbers.MSGLEN_END_IDX], 16)
        msglen_value_end_pos = int(doc[MagicNumbers.MSGLEN_END_IDX : MagicNumbers.MSGLEN_END_IDX + 10], 16)

        msgsize = int(doc[msglen_value_start_pos:msglen_value_end_pos], 10)

        if rallyts_start_pos != -1:
            # doc["message"] contains _RALLYTS with timestamp format specification (most of integrations)

            rallyts_len = int(doc[rallyts_start_pos + MagicNumbers.RALLYTS_LEN : rallyts_start_pos + MagicNumbers.RALLYTSDATA_LEN_END], 10)

            ts_format = doc[
                rallyts_start_pos + MagicNumbers.RALLYTS_FORMAT_BEGIN : rallyts_start_pos + MagicNumbers.RALLYTS_FORMAT_BEGIN + rallyts_len
            ]

            # %s is spuriously supported/implemented, depending on the platform's implementation of strftime. Here we specifically implement handling to mean
            # timezone-less interpretation of the epoch
            if ts_format == "%s":
                # turns out float.__trunc__ is faster than builtins.int(<float>) per microbenchmark
                formatted_ts = timestamp.timestamp().__trunc__()
            else:
                formatted_ts = time.strftime(ts_format, timestamp.timetuple())

            # replace _RALLYTSNNN<...> with generated timestamp in the right format
            # and omit the "markers" key
            doc = (
                f"{doc[:rallyts_start_pos]}"
                f"{formatted_ts}"
                f"{doc[rallyts_start_pos + MagicNumbers.RALLYTS_FORMAT_BEGIN + rallyts_len + 1: MagicNumbers.MARKER_IDX]}"
                f"}}}}"
            )
        else:
            # no timestamp in message field e.g. application-logs, redis-slowlog-log
            # directly copy timestamp in a format compatible with the `date` ES field (`strict_date_optional_time`)

            ts_value_start_pos = int(doc[MagicNumbers.TS_BEGIN_IDX : MagicNumbers.TS_END_IDX], 16)
            ts_value_end_pos = int(doc[MagicNumbers.TS_END_IDX : MagicNumbers.MSGLEN_BEGIN_IDX], 16)

            formatted_ts = "%04d-%02d-%02dT%02d:%02d:%02d" % (
                timestamp.year,
                timestamp.month,
                timestamp.day,
                timestamp.hour,
                timestamp.minute,
                timestamp.second,
            )

            # prevent Elasticsearch achieving unrealistically high compression levels
            # by varying the millisecond suffix in the timestamp with minimal cpu overhead
            LARGEINT = 123132434 + line_num

            # replace @timestamp value with generated timestamp
            # and omit the "markers" key
            doc = (
                f"{doc[:ts_value_start_pos]}"
                f"""{formatted_ts}.{LARGEINT % 1000}Z"""
                f"{doc[ts_value_end_pos: MagicNumbers.MARKER_IDX]}"
                f"}}}}"
            )

        return doc, msgsize