in elastic/shared/parameter_sources/processed.py [0:0]
def _json_processor(self, doc: bytes, line_num: int, _: str) -> Tuple[str, int]:
doc = doc.decode("utf-8").strip()
if line_num % 2 == 0:
return doc, 0
# adds the timestamp to docs not metadata lines which will be in generated files
timestamp = self._ts_generator.next_timestamp()
# we assume date order - maybe speed this up for a boolean check on first request?
if timestamp < self.min_timestamp:
self.min_timestamp = timestamp
self.max_timestamp = timestamp
# see ProcessedCorpusParamSource for more details
rallyts_start_pos = int(doc[MagicNumbers.RALLYTS_BEGIN_IDX : MagicNumbers.TS_BEGIN_IDX], 16)
msglen_value_start_pos = int(doc[MagicNumbers.MSGLEN_BEGIN_IDX : MagicNumbers.MSGLEN_END_IDX], 16)
msglen_value_end_pos = int(doc[MagicNumbers.MSGLEN_END_IDX : MagicNumbers.MSGLEN_END_IDX + 10], 16)
msgsize = int(doc[msglen_value_start_pos:msglen_value_end_pos], 10)
if rallyts_start_pos != -1:
# doc["message"] contains _RALLYTS with timestamp format specification (most of integrations)
rallyts_len = int(doc[rallyts_start_pos + MagicNumbers.RALLYTS_LEN : rallyts_start_pos + MagicNumbers.RALLYTSDATA_LEN_END], 10)
ts_format = doc[
rallyts_start_pos + MagicNumbers.RALLYTS_FORMAT_BEGIN : rallyts_start_pos + MagicNumbers.RALLYTS_FORMAT_BEGIN + rallyts_len
]
# %s is spuriously supported/implemented, depending on the platform's implementation of strftime. Here we specifically implement handling to mean
# timezone-less interpretation of the epoch
if ts_format == "%s":
# turns out float.__trunc__ is faster than builtins.int(<float>) per microbenchmark
formatted_ts = timestamp.timestamp().__trunc__()
else:
formatted_ts = time.strftime(ts_format, timestamp.timetuple())
# replace _RALLYTSNNN<...> with generated timestamp in the right format
# and omit the "markers" key
doc = (
f"{doc[:rallyts_start_pos]}"
f"{formatted_ts}"
f"{doc[rallyts_start_pos + MagicNumbers.RALLYTS_FORMAT_BEGIN + rallyts_len + 1: MagicNumbers.MARKER_IDX]}"
f"}}}}"
)
else:
# no timestamp in message field e.g. application-logs, redis-slowlog-log
# directly copy timestamp in a format compatible with the `date` ES field (`strict_date_optional_time`)
ts_value_start_pos = int(doc[MagicNumbers.TS_BEGIN_IDX : MagicNumbers.TS_END_IDX], 16)
ts_value_end_pos = int(doc[MagicNumbers.TS_END_IDX : MagicNumbers.MSGLEN_BEGIN_IDX], 16)
formatted_ts = "%04d-%02d-%02dT%02d:%02d:%02d" % (
timestamp.year,
timestamp.month,
timestamp.day,
timestamp.hour,
timestamp.minute,
timestamp.second,
)
# prevent Elasticsearch achieving unrealistically high compression levels
# by varying the millisecond suffix in the timestamp with minimal cpu overhead
LARGEINT = 123132434 + line_num
# replace @timestamp value with generated timestamp
# and omit the "markers" key
doc = (
f"{doc[:ts_value_start_pos]}"
f"""{formatted_ts}.{LARGEINT % 1000}Z"""
f"{doc[ts_value_end_pos: MagicNumbers.MARKER_IDX]}"
f"}}}}"
)
return doc, msgsize