def _create_test_procedures()

in osbenchmark/workload/loader.py [0:0]


    def _create_test_procedures(self, workload_spec):
        ops = self.parse_operations(self._r(workload_spec, "operations", mandatory=False, default_value=[]))
        workload_params = self._r(workload_spec, "parameters", mandatory=False, default_value={})
        test_procedures = []
        known_test_procedure_names = set()
        default_test_procedure = None
        test_procedure_specs, auto_generated = self._get_test_procedure_specs(workload_spec)
        number_of_test_procedures = len(test_procedure_specs)
        for test_procedure_spec in test_procedure_specs:
            name = self._r(test_procedure_spec, "name", error_ctx="test_procedures")
            description = self._r(test_procedure_spec, "description", error_ctx=name, mandatory=False)
            user_info = self._r(test_procedure_spec, "user-info", error_ctx=name, mandatory=False)
            test_procedure_params = self._r(test_procedure_spec, "parameters", error_ctx=name, mandatory=False, default_value={})
            meta_data = self._r(test_procedure_spec, "meta", error_ctx=name, mandatory=False)
            # if we only have one test_procedure it is treated as default test_procedure, no matter what the user has specified
            default = number_of_test_procedures == 1 or self._r(test_procedure_spec, "default", error_ctx=name, mandatory=False)
            selected = number_of_test_procedures == 1 or self.selected_test_procedure == name
            if default and default_test_procedure is not None:
                self._error("Both '%s' and '%s' are defined as default test_procedures. Please define only one of them as default."
                            % (default_test_procedure.name, name))
            if name in known_test_procedure_names:
                self._error("Duplicate test_procedure with name '%s'." % name)
            known_test_procedure_names.add(name)

            schedule = []

            for op in self._r(test_procedure_spec, "schedule", error_ctx=name):
                if "parallel" in op:
                    task = self.parse_parallel(op["parallel"], ops, name)
                else:
                    task = self.parse_task(op, ops, name)
                schedule.append(task)

            # verify we don't have any duplicate task names (which can be confusing / misleading in results_publishing).
            known_task_names = set()
            for task in schedule:
                for sub_task in task:
                    if sub_task.name in known_task_names:
                        self._error("TestProcedure '%s' contains multiple tasks with the name '%s'. Please use the task's name property to "
                                    "assign a unique name for each task." % (name, sub_task.name))
                    else:
                        known_task_names.add(sub_task.name)

            # merge params
            final_test_procedure_params = dict(collections.merge_dicts(workload_params, test_procedure_params))

            test_procedure = workload.TestProcedure(name=name,
                                        parameters=final_test_procedure_params,
                                        meta_data=meta_data,
                                        description=description,
                                        user_info=user_info,
                                        default=default,
                                        selected=selected,
                                        auto_generated=auto_generated,
                                        schedule=schedule)
            if default:
                default_test_procedure = test_procedure

            test_procedures.append(test_procedure)

        if test_procedures and default_test_procedure is None:
            self._error(
                "No default test_procedure specified. Please edit the workload and add \"default\": true to one of the test_procedures %s."
                        % ", ".join([c.name for c in test_procedures]))
        return test_procedures