in elastic/shared/parameter_sources/workflow_selector.py [0:0]
def __init__(self, track, params, **kwargs):
self.logger = logging.getLogger(__name__)
self._orig_args = [track, params, kwargs]
self._params = params
self.infinite = True
self.workflows = []
self.workflow_handlers = {}
workflow_folder = os.path.join(track.root, params.get("workflows-folder", "workflows"))
self.workflow = mandatory(params, "workflow", "composite")
# prefer the seed passed by partition if more than 1 client
self.task_offset = mandatory(params, "task-offset", "composite")
self.random_seed = params.get(
"random-seed",
track.selected_challenge_or_default.parameters.get("random-seed"),
)
# seed here is to ensure repeatable durations
random.seed(self.random_seed)
self.logger.info("Workflow [%s] is using seed [%s]", self.workflow, self.random_seed)
self.number_of_tasks = track.selected_challenge_or_default.parameters.get("number-of-workflows")
# for testing purposes only we allow a configurable now function
self._utc_now = kwargs.get("utc_now", datetime.utcnow)
self._init_date = self._utc_now().replace(tzinfo=timezone.utc)
self._detailed_results = params.get(
"detailed-results", track.selected_challenge_or_default.parameters.get("detailed-results", False)
)
self._workflow_target = params.get(
"workflow-target",
track.selected_challenge_or_default.parameters.get("workflow-target"),
)
self._request_cache = params.get("request-cache", track.selected_challenge_or_default.parameters.get("workflow-request-cache"))
self._max_time_interval = timedelta.min
# sorted to ensure natural sort order that respects numerics
for action_filename in sorted(
glob.glob(os.path.join(workflow_folder, self.workflow, "*.json"), recursive=True),
key=self.natural_sort_key,
):
self.logger.debug("Loading action from file [%s]", action_filename)
with open(action_filename, "r") as action_file:
action = json.load(action_file)
if "id" not in action:
raise exceptions.TrackConfigError(f'Action [{action_filename}] for [{self.workflow}] is missing an "id" key')
if "requests" not in action:
raise exceptions.TrackConfigError(f'Action [{action_filename}] for [{self.workflow}] is missing a "requests" key')
action_id = action["id"]
if action_id in self.workflow_handlers:
raise exceptions.TrackConfigError(
f"Action id [{action_id}] for [{self.workflow}] is duplicated. This must be " f"unique"
)
self.logger.debug(
"Adding action with id [%s] to workflow [%s]",
action_id,
self.workflow,
)
self.workflows.append((action_id, action))
if self._detailed_results:
# enable detailed results on every query
self.set_detailed_results(action)
if self._workflow_target:
# override captured query targets with enabled integrations
self.set_target_index(action)
if self._request_cache is not None:
# set request cache in search operations
self.set_request_cache(action)
request_params = params.get("request-params", {})
WorkflowSelectorParamSource.stringify_bool(request_params)
if request_params:
self.set_request_params(action, request_params)
query_handlers = self.get_query_handlers(action, queries=[])
time_interval = WorkflowSelectorParamSource.get_max_time_interval(query_handlers)
if time_interval and time_interval > self._max_time_interval:
self._max_time_interval = time_interval
self.workflow_handlers[action_id] = query_handlers
if len(self.workflows) == 0:
raise exceptions.TrackConfigError(f"No actions loaded. " f"[{workflow_folder}] contains no " f"action files")
self.current_index = 0
self._min_date = parse_date_time(
params.get(
"query-min-date",
track.selected_challenge_or_default.parameters.get("query-min-date"),
)
)
self._max_date = parse_date_time(
params.get(
"query-max-date",
track.selected_challenge_or_default.parameters.get("query-max-date"),
)
)
self._max_date_start = parse_date_time(
params.get(
"query-max-date-start",
track.selected_challenge_or_default.parameters.get("query-max-date-start"),
)
)
if self._max_date and self._max_date_start:
raise exceptions.TrackConfigError(
f"Error in {self.workflow} configuration. " "Only one of 'query-max-date' and 'query-max-date-start' can be defined."
)
elif self._max_date is None and self._max_date_start is None:
# must set default here, or else conflict check with query-max-date-start is not possible
self._max_date = parse_date_time(DEFAULT_MAX_DATE)
self._avg_query_duration = parse_interval(
params.get(
"query-average-interval",
track.selected_challenge_or_default.parameters.get("query-average-interval"),
)
)
# int, in seconds. for testing purposes
self._min_query_duration = kwargs.get("min_query_duration", 15 * 60)
# using the max time interval we generate an upper bound for the duration - all other generated durations will
# be scaled using this. It can be greater than (self._max_date - self._min_date) or less than the min.
self.max_possible_duration = (datetime.max - datetime.min).total_seconds()
if self._min_date and self._max_date:
self.max_possible_duration = int((self._max_date - self._min_date).total_seconds())
self.max_query_duration = random_duration_for_max(
self._avg_query_duration,
self._min_query_duration,
self.max_possible_duration,
)