in osbenchmark/workload/loader.py [0:0]
def _create_test_procedures(self, workload_spec):
ops = self.parse_operations(self._r(workload_spec, "operations", mandatory=False, default_value=[]))
workload_params = self._r(workload_spec, "parameters", mandatory=False, default_value={})
test_procedures = []
known_test_procedure_names = set()
default_test_procedure = None
test_procedure_specs, auto_generated = self._get_test_procedure_specs(workload_spec)
number_of_test_procedures = len(test_procedure_specs)
for test_procedure_spec in test_procedure_specs:
name = self._r(test_procedure_spec, "name", error_ctx="test_procedures")
description = self._r(test_procedure_spec, "description", error_ctx=name, mandatory=False)
user_info = self._r(test_procedure_spec, "user-info", error_ctx=name, mandatory=False)
test_procedure_params = self._r(test_procedure_spec, "parameters", error_ctx=name, mandatory=False, default_value={})
meta_data = self._r(test_procedure_spec, "meta", error_ctx=name, mandatory=False)
# if we only have one test_procedure it is treated as default test_procedure, no matter what the user has specified
default = number_of_test_procedures == 1 or self._r(test_procedure_spec, "default", error_ctx=name, mandatory=False)
selected = number_of_test_procedures == 1 or self.selected_test_procedure == name
if default and default_test_procedure is not None:
self._error("Both '%s' and '%s' are defined as default test_procedures. Please define only one of them as default."
% (default_test_procedure.name, name))
if name in known_test_procedure_names:
self._error("Duplicate test_procedure with name '%s'." % name)
known_test_procedure_names.add(name)
schedule = []
for op in self._r(test_procedure_spec, "schedule", error_ctx=name):
if "parallel" in op:
task = self.parse_parallel(op["parallel"], ops, name)
else:
task = self.parse_task(op, ops, name)
schedule.append(task)
# verify we don't have any duplicate task names (which can be confusing / misleading in results_publishing).
known_task_names = set()
for task in schedule:
for sub_task in task:
if sub_task.name in known_task_names:
self._error("TestProcedure '%s' contains multiple tasks with the name '%s'. Please use the task's name property to "
"assign a unique name for each task." % (name, sub_task.name))
else:
known_task_names.add(sub_task.name)
# merge params
final_test_procedure_params = dict(collections.merge_dicts(workload_params, test_procedure_params))
test_procedure = workload.TestProcedure(name=name,
parameters=final_test_procedure_params,
meta_data=meta_data,
description=description,
user_info=user_info,
default=default,
selected=selected,
auto_generated=auto_generated,
schedule=schedule)
if default:
default_test_procedure = test_procedure
test_procedures.append(test_procedure)
if test_procedures and default_test_procedure is None:
self._error(
"No default test_procedure specified. Please edit the workload and add \"default\": true to one of the test_procedures %s."
% ", ".join([c.name for c in test_procedures]))
return test_procedures