def __init__()

in elastic/shared/parameter_sources/workflow_selector.py [0:0]


    def __init__(self, track, params, **kwargs):
        self.logger = logging.getLogger(__name__)
        self._orig_args = [track, params, kwargs]
        self._params = params
        self.infinite = True
        self.workflows = []
        self.workflow_handlers = {}
        workflow_folder = os.path.join(track.root, params.get("workflows-folder", "workflows"))
        self.workflow = mandatory(params, "workflow", "composite")
        # prefer the seed passed by partition if more than 1 client
        self.task_offset = mandatory(params, "task-offset", "composite")
        self.random_seed = params.get(
            "random-seed",
            track.selected_challenge_or_default.parameters.get("random-seed"),
        )
        # seed here is to ensure repeatable durations
        random.seed(self.random_seed)
        self.logger.info("Workflow [%s] is using seed [%s]", self.workflow, self.random_seed)
        self.number_of_tasks = track.selected_challenge_or_default.parameters.get("number-of-workflows")
        # for testing purposes only we allow a configurable now function
        self._utc_now = kwargs.get("utc_now", datetime.utcnow)
        self._init_date = self._utc_now().replace(tzinfo=timezone.utc)
        self._detailed_results = params.get(
            "detailed-results", track.selected_challenge_or_default.parameters.get("detailed-results", False)
        )
        self._workflow_target = params.get(
            "workflow-target",
            track.selected_challenge_or_default.parameters.get("workflow-target"),
        )
        self._request_cache = params.get("request-cache", track.selected_challenge_or_default.parameters.get("workflow-request-cache"))
        self._max_time_interval = timedelta.min
        # sorted to ensure natural sort order that respects numerics
        for action_filename in sorted(
            glob.glob(os.path.join(workflow_folder, self.workflow, "*.json"), recursive=True),
            key=self.natural_sort_key,
        ):
            self.logger.debug("Loading action from file [%s]", action_filename)
            with open(action_filename, "r") as action_file:
                action = json.load(action_file)
                if "id" not in action:
                    raise exceptions.TrackConfigError(f'Action [{action_filename}] for [{self.workflow}] is missing an "id" key')
                if "requests" not in action:
                    raise exceptions.TrackConfigError(f'Action [{action_filename}] for [{self.workflow}] is missing a "requests" key')
                action_id = action["id"]
                if action_id in self.workflow_handlers:
                    raise exceptions.TrackConfigError(
                        f"Action id [{action_id}] for [{self.workflow}] is duplicated. This must be " f"unique"
                    )
                self.logger.debug(
                    "Adding action with id [%s] to workflow [%s]",
                    action_id,
                    self.workflow,
                )
                self.workflows.append((action_id, action))
                if self._detailed_results:
                    # enable detailed results on every query
                    self.set_detailed_results(action)
                if self._workflow_target:
                    # override captured query targets with enabled integrations
                    self.set_target_index(action)
                if self._request_cache is not None:
                    # set request cache in search operations
                    self.set_request_cache(action)
                request_params = params.get("request-params", {})
                WorkflowSelectorParamSource.stringify_bool(request_params)
                if request_params:
                    self.set_request_params(action, request_params)
                query_handlers = self.get_query_handlers(action, queries=[])
                time_interval = WorkflowSelectorParamSource.get_max_time_interval(query_handlers)
                if time_interval and time_interval > self._max_time_interval:
                    self._max_time_interval = time_interval
                self.workflow_handlers[action_id] = query_handlers

        if len(self.workflows) == 0:
            raise exceptions.TrackConfigError(f"No actions loaded. " f"[{workflow_folder}] contains no " f"action files")
        self.current_index = 0
        self._min_date = parse_date_time(
            params.get(
                "query-min-date",
                track.selected_challenge_or_default.parameters.get("query-min-date"),
            )
        )

        self._max_date = parse_date_time(
            params.get(
                "query-max-date",
                track.selected_challenge_or_default.parameters.get("query-max-date"),
            )
        )
        self._max_date_start = parse_date_time(
            params.get(
                "query-max-date-start",
                track.selected_challenge_or_default.parameters.get("query-max-date-start"),
            )
        )
        if self._max_date and self._max_date_start:
            raise exceptions.TrackConfigError(
                f"Error in {self.workflow} configuration. " "Only one of 'query-max-date' and 'query-max-date-start' can be defined."
            )
        elif self._max_date is None and self._max_date_start is None:
            # must set default here, or else conflict check with query-max-date-start is not possible
            self._max_date = parse_date_time(DEFAULT_MAX_DATE)
        self._avg_query_duration = parse_interval(
            params.get(
                "query-average-interval",
                track.selected_challenge_or_default.parameters.get("query-average-interval"),
            )
        )
        # int, in seconds. for testing purposes
        self._min_query_duration = kwargs.get("min_query_duration", 15 * 60)

        # using the max time interval we generate an upper bound for the duration - all other generated durations will
        # be scaled using this. It can be greater than (self._max_date - self._min_date) or less than the min.
        self.max_possible_duration = (datetime.max - datetime.min).total_seconds()
        if self._min_date and self._max_date:
            self.max_possible_duration = int((self._max_date - self._min_date).total_seconds())
        self.max_query_duration = random_duration_for_max(
            self._avg_query_duration,
            self._min_query_duration,
            self.max_possible_duration,
        )