share/expand_event_list_from_field.py (83 lines of code) (raw):

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. from copy import deepcopy from typing import Any, Callable, Iterator, Optional, Union from .json import json_dumper from .logger import logger as shared_logger # ExpandEventListFromFieldResolverCallable accepts an integration_scope and the field to expand events list from as # arguments. It returns the resolved name of the field to expand the events list from. ExpandEventListFromFieldResolverCallable = Callable[[str, str], str] class ExpandEventListFromField: def __init__( self, field_to_expand_event_list_from: str, integration_scope: str, field_resolver: ExpandEventListFromFieldResolverCallable, root_fields_to_add_to_expanded_event: Optional[Union[str, list[str]]] = None, last_event_expanded_offset: Optional[int] = None, ): self._last_event_expanded_offset: Optional[int] = last_event_expanded_offset self._root_fields_to_add_to_expanded_event = root_fields_to_add_to_expanded_event self._field_to_expand_event_list_from: str = field_resolver(integration_scope, field_to_expand_event_list_from) def _expand_event_list_from_field( self, json_object: dict[str, Any], starting_offset: int, ending_offset: int ) -> Iterator[tuple[Any, int, Optional[int], bool, bool]]: if len(self._field_to_expand_event_list_from) == 0 or self._field_to_expand_event_list_from not in json_object: yield None, starting_offset, 0, True, False else: events_list: list[Any] = json_object[self._field_to_expand_event_list_from] # let's set to 1 if empty list to avoid division by zero in the line below, # for loop will be not executed anyway offset_skew = 0 events_list_length = max(1, len(events_list)) avg_event_length = (ending_offset - starting_offset) / events_list_length if self._last_event_expanded_offset is not None and len(events_list) > self._last_event_expanded_offset + 1: offset_skew = self._last_event_expanded_offset + 1 events_list = events_list[offset_skew:] # Let's compute the root_fields_to_add_to_expanded_event only once per events to expand root_fields_to_add_to_expanded_event: dict[str, Any] = {} if self._root_fields_to_add_to_expanded_event == "all": root_fields_to_add_to_expanded_event = deepcopy(json_object) del root_fields_to_add_to_expanded_event[self._field_to_expand_event_list_from] # we want to add only a list of root fields elif isinstance(self._root_fields_to_add_to_expanded_event, list): for root_field_to_add_to_expanded_event in self._root_fields_to_add_to_expanded_event: if root_field_to_add_to_expanded_event in json_object: root_fields_to_add_to_expanded_event[root_field_to_add_to_expanded_event] = json_object[ root_field_to_add_to_expanded_event ] else: shared_logger.debug( f"`{root_field_to_add_to_expanded_event}` field specified in " f"`root_fields_to_add_to_expanded_event` parameter is not present at root level" f" to expanded event not present at root level" ) for event_n, event in enumerate(events_list): if self._root_fields_to_add_to_expanded_event: # we can and want to add the root fields only in case the event is a not empty json object if isinstance(event, dict) and len(event) > 0: # we want to add all the root fields event.update(root_fields_to_add_to_expanded_event) else: shared_logger.debug("root fields to be added on a non json object event") event_n += offset_skew yield event, int( starting_offset + (event_n * avg_event_length) ), event_n, event_n == events_list_length - 1, True def expand( self, log_event: bytes, json_object: Optional[dict[str, Any]], starting_offset: int, ending_offset: int ) -> Iterator[tuple[bytes, int, int, Optional[int]]]: if json_object is None: yield log_event, starting_offset, ending_offset, None else: # expanded_ending_offset is set to the starting_offset because if we want to set it to the beginning of the # json object in case of a message from the continuation queue. if we update it, if the payload is continued # we will fetch the content of the payload from the middle of the json object, failing to parse it expanded_ending_offset: int = starting_offset for ( expanded_event, expanded_starting_offset, expanded_event_n, is_last_expanded_event, event_was_expanded, ) in self._expand_event_list_from_field(json_object, starting_offset, ending_offset): if event_was_expanded: # empty values once json dumped might have a len() greater than 0, this will prevent # them to be skipped later as empty value, so we yield as zero length bytes string if not expanded_event: expanded_log_event = b"" else: expanded_log_event = json_dumper(expanded_event).encode("utf-8") if is_last_expanded_event: expanded_event_n = None # only when we reach the last expanded event we can move the ending offset expanded_ending_offset = ending_offset else: expanded_event_n = None expanded_log_event = log_event expanded_ending_offset = ending_offset yield expanded_log_event, expanded_starting_offset, expanded_ending_offset, expanded_event_n