in elastic/shared/parameter_sources/processed.py [0:0]
def __init__(self, track, params, **kwargs):
self.logger = logging.getLogger(__name__)
self._orig_args = [track, params, kwargs]
# allows to be specified for testing only
self.track = track
self._complete = False
self.infinite = False
self._params = params
self.kwargs = kwargs
self._track_id = track.selected_challenge_or_default.parameters["track-id"]
self._random_seed = track.selected_challenge_or_default.parameters.get("random-seed", None)
self.logger.info(f"Using track id {self._track_id}")
raw_volume_per_day = mandatory(
track.selected_challenge_or_default.parameters,
"raw-data-volume-per-day",
"bulk",
)
self._volume_per_day_gb = convert_to_gib(raw_volume_per_day)
self.start_time = int(time.perf_counter())
self._profile = params.get("profile", "fixed_interval")
now = datetime.utcnow().replace(tzinfo=timezone.utc)
def utc_now():
return now
if params.get("init-load", False):
# this is an undocumented parameter that causes bulk-start-date and bulk-end-date to be used for pre loading
# datasets in cases where multiple challenges cannot be run
self._start_date = parse_date_time(
track.selected_challenge_or_default.parameters.get("bulk-start-date", DEFAULT_START_DATE),
utcnow=utc_now,
)
self._end_date = parse_date_time(
track.selected_challenge_or_default.parameters.get("bulk-end-date", DEFAULT_END_DATE),
utcnow=utc_now,
)
else:
self._start_date = parse_date_time(
track.selected_challenge_or_default.parameters.get("start-date", DEFAULT_START_DATE),
utcnow=utc_now,
)
self._end_date = parse_date_time(
track.selected_challenge_or_default.parameters.get("end-date", DEFAULT_END_DATE),
utcnow=utc_now,
)
self.logger.info(
"Using date range [%s] to [%s] for " "indexing",
self._start_date.isoformat(),
self._end_date.isoformat(),
)
if self._start_date > self._end_date:
raise exceptions.TrackConfigError(
f'"start-date" cannot be greater than "end-date" for operation ' f"\"{self._params.get('operation-type')}\""
)
self._number_of_days = (self._end_date - self._start_date).total_seconds() / 86400
self._time_format = params.get("time-format", "milliseconds")
self._processor = self._json_processor
# we have meta in our bulk and don't need these for solution tracks and datastreams
self.id_conflicts = IndexIdConflict.NoConflicts
self.conflict_probability = None
self.on_conflict = None
self.recency = None
# defer to bulk as blended
self.pipeline = None
try:
self.bulk_size = int(mandatory(params, "bulk-size", "bulk"))
if self.bulk_size <= 0:
raise exceptions.InvalidSyntax(f'"bulk-size" must be positive but was {self.bulk_size}')
# each doc needs consists of meta and doc
self.lines_per_bulk = self.bulk_size * 2
except KeyError:
raise exceptions.InvalidSyntax('Mandatory parameter "bulk-size" is missing')
except ValueError:
raise exceptions.InvalidSyntax('"bulk-size" must be numeric')
self.corpus = next(
(c for c in track.corpora if c.meta_data.get("generated", False)),
None,
)
self._reset_timestamps()
# TODO: Validate this exists and has files
self.current_docs = 0
# Set to 1 to avoid division by zero if percent_completed is called before this parameter has
# been initialized to its actual value (i.e. before the first call to #params()),
self.docs_per_client = 1