eventdata/parameter_sources/elasticlogs_bulk_source.py (90 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import copy
import logging
import random
from eventdata.parameter_sources.randomevent import RandomEvent
logger = logging.getLogger("track.eventdata")
class ElasticlogsBulkSource:
"""
Generates a bulk indexing request for elasticlogs data.
It expects the parameter hash to contain the following keys:
"bulk-size" - Integer indicating events generated per bulk request.
"index" - Name of index, index prefix or alias documents should be indexed into. The index name
can be made to generate time based indices by including date formatting in the name.
'test-<yyyy>-<mm>-<dd>-<hh>' will generate an hourly index. (mandatory)
"starting_point" - String specifying the starting point for event time generation. It supports absolute or
relative values as follows:
'now' - Always evaluated to the current timestamp at time of generation
'now-1h' - Offset to the current timestamp. Consists of a number and
either m (minutes), h (hours) or d (days).
'2017-02-20 20:12:32' - Exact timestamp.
'2017-02-20' - Date. Time will be assumed to be 00:00:00.
If a relative starting point (based on now) is provided, this will be used for generation.
In the case an exact timestamp is provided as starting point, the difference to now will
be calculated when the generation starts and this will be used as an offset for all events.
Defaults to 'now'.
"acceleration_factor" - This factor allows the time progression in the timestamp calculation to be altered.
A value larger than 1 will accelerate generation and a value lower than 1 will slow
it down. If a task is set up to run indexing for one hour with a fixed starting
point of '2016-12-20 20:12:32' and an acceleration factor of 2.0, events will be
generated in timestamp sequence covering a 2-hour window, '2017-02-20 20:12:32'
to '2017-02-20 22:12:32' (approximately).
"id_type" - Type of document id to use for generated documents. Defaults to `auto`.
auto - Do not explicitly set id and let Elasticsearch assign automatically.
seq - Assign sequentialy incrementing integer ids to each document.
"id_seq_probability" - If set, the probability an existing id will be used to simulate an update.
Applied only when `id_type` is seq.
Defaults to 0.0 which brings no updates. Must be in range [0.0, 1.0].
"id_seq_low_id_bias" - If set, favor low ids with a very high bias. Must be True/False. Default is False.
"""
def __init__(self, track, params, **kwargs):
self.infinite = False
self.orig_args = [track, params, kwargs]
self._indices = track.indices
self._params = params
# we could also do `kwargs.get("random_event", RandomEvent(params))` but that would call the constructor eagerly
# which we want to avoid because this can cause significant overhead.
if "random_event" in kwargs:
self._randomevent = kwargs["random_event"]
else:
self._randomevent = RandomEvent(params)
self._bulk_size = params["bulk-size"]
self.seq_id = 0
self._id_type = params.get("id_type", "auto")
if self._id_type not in ["auto", "seq"]:
raise AssertionError("The value [{}] is invalid for the parameter [id_type]".format(self._id_type))
if self._id_type == "seq":
self._id_seq_probability = float(params.get("id_seq_probability", 0.0))
self._low_id_bias = str(params.get('id_seq_low_id_bias', False)).lower() == "true"
if self._low_id_bias:
logger.info("Will use low id bias for updates")
else:
logger.info("Will use uniform distribution for updates")
self._default_index = False
if "index" not in params.keys():
index_name = self._indices[0].name
if len(self._indices) > 1:
logger.debug("[bulk] More than one index specified in track configuration. Will use the first one ({})".format(index_name))
else:
logger.debug("[bulk] Using index specified in track configuration ({})".format(index_name))
self._params["index"] = index_name
self._default_index = True
else:
logger.debug("[bulk] Index pattern specified in parameters ({}) will be used".format(params["index"]))
def partition(self, partition_index, total_partitions):
if self._params.get("id_type") != "seq":
seed = partition_index * self._params["seed"] if "seed" in self._params else None
random.seed(seed)
new_params = copy.deepcopy(self.orig_args[1])
new_params["client_id"] = partition_index
new_params["client_count"] = total_partitions
return ElasticlogsBulkSource(self.orig_args[0], new_params, **self.orig_args[2])
@property
def percent_completed(self):
# progress is determined either by:
#
# * the `time-period` or `iteration` property specified on the corresponding task
# * `#params()` raising `StopIteration` when `RandomEvent` is exhausted
return self._randomevent.percent_completed
def params(self):
# Build bulk array
bulk_array = []
self._randomevent.start_bulk(self._bulk_size)
for x in range(0, self._bulk_size):
try:
evt, idx, typ = self._randomevent.generate_event()
except StopIteration:
if len(bulk_array) > 0:
# return any remaining items if there are any (otherwise we'd lose the last bulk request)
break
else:
# otherwise stop immediately
raise
if self._id_type == "auto":
bulk_array.append('{"index": {"_index": "%s"}}' % idx)
else:
docid = "%s-%d" % (self.__get_seq_id(), self._params["client_id"])
bulk_array.append('{"index": {"_index": "%s", "_id": "%s"}}' % (idx, docid))
bulk_array.append(evt)
response = {
"body": "\n".join(bulk_array),
"action-metadata-present": True,
# the bulk array contains the action-and-metadata line and the actual document
"bulk-size": len(bulk_array) // 2,
"unit": "docs"
}
if "pipeline" in self._params.keys():
response["pipeline"] = self._params["pipeline"]
return response
def __get_seq_id(self):
_id = self.seq_id
if random.uniform(0, 1) < self._id_seq_probability:
# conflict
if self._low_id_bias:
# update; heavily bias towards older ids
_p = 10
_min = 0
_max = _id
# _p ~> 0: results closer to min, _p >> 0: results closer to max
_id = _min + (_max - _min) * pow(random.random(), _p)
else:
# update; pick id from pure uniform distribution
_id = random.randint(0, _id-1 if _id > 0 else 0)
else:
# new document
self.__incr_seq_id()
return "%012d" % _id
def __incr_seq_id(self):
self.seq_id += 1