in lib/ramble/ramble/application.py [0:0]
def _analyze_experiments(self, workspace, app_inst=None):
"""Perform experiment analysis.
This method will build up the fom_values dictionary. Its structure is:
fom_values[context][fom]
A fom can show up in any number of explicit contexts (including zero).
If the number of explicit contexts is zero, the fom is associated with
the default '(null)' context.
Success is determined at analysis time as well. This happens by checking if:
- At least one FOM is extracted
AND
- Any defined success criteria pass
Success criteria are defined within the application.py, but can also be
injected in a workspace config.
"""
if self.get_status() == ExperimentStatus.UNKNOWN and not workspace.dry_run:
logger.warn(f"Experiment has status {self.get_status()}. Skipping analysis..\n")
return
def format_context(context_match, context_format):
context_val = {}
if isinstance(context_format, str):
for group in string.Formatter().parse(context_format):
if group[1]:
context_val[group[1]] = context_match[group[1]]
context_string = context_format.format(**context_val)
return context_string
fom_values = {}
criteria_list = workspace.success_list
criteria_list.reset()
files, contexts, foms = self._analysis_dicts(criteria_list)
exp_lock = self.experiment_lock()
# Iterate over files. We already know they exist
with lk.ReadTransaction(exp_lock):
for file, file_conf in files.items():
# Start with no active contexts in a file.
active_contexts = {}
logger.debug(f"Reading log file: {file}")
if not os.path.exists(file):
logger.debug(f"Skipping analysis of non-existent file: {file}")
continue
per_file_crit_objs = [
criteria_list.find_criteria(c) for c in file_conf["success_criteria"]
]
with open(file) as f:
for line in f.readlines():
new_per_file_crit_objs = []
for crit_obj in per_file_crit_objs:
if crit_obj.passed(line, self):
crit_obj.mark_found()
elif crit_obj.anti_matched(line):
crit_obj.mark_anti_found()
else:
new_per_file_crit_objs.append(crit_obj)
per_file_crit_objs = new_per_file_crit_objs
for context in file_conf["contexts"]:
context_conf = contexts[context]
context_match = context_conf["regex"].match(line)
if context_match:
context_name = format_context(
context_match, context_conf["format"]
)
logger.debug("Line was: %s" % line)
logger.debug(f" Context match {context} -- {context_name}")
active_contexts[context] = context_name
if context_name not in fom_values:
fom_values[context_name] = {}
for fom in file_conf["foms"]:
fom_conf = foms[fom]
fom_match = fom_conf["regex"].match(line)
if fom_match:
fom_vars = {}
for k, v in fom_match.groupdict().items():
fom_vars[k] = v
if fom_conf["fom_name_expanded"] is not None:
fom_name = fom_conf["fom_name_expanded"]
else:
fom_name = self.expander.expand_var(fom, extra_vars=fom_vars)
if fom_conf["group"] in fom_conf["regex"].groupindex:
logger.debug(" --- Matched fom %s" % fom_name)
fom_contexts = []
if fom_conf["contexts"]:
for context in fom_conf["contexts"]:
context_name = (
active_contexts[context]
if context in active_contexts
else _NULL_CONTEXT
)
fom_contexts.append(context_name)
else:
fom_contexts.append(_NULL_CONTEXT)
for context in fom_contexts:
if context not in fom_values:
fom_values[context] = {}
fom_val = fom_match.group(fom_conf["group"])
if fom_conf["units_expanded"] is not None:
fom_unit = fom_conf["units"]
else:
fom_unit = self.expander.expand_var(
fom_conf["units"], extra_vars=fom_vars
)
fom_values[context][fom_name] = {
"value": fom_val,
"units": fom_unit,
"origin": fom_conf["origin"],
"origin_type": fom_conf["origin_type"],
"fom_type": fom_conf["fom_type"],
}
# Test all non-file based success criteria
for criteria_obj in criteria_list.all_criteria():
if criteria_obj.file is None:
if criteria_obj.passed(app_inst=self, fom_values=fom_values):
criteria_obj.mark_found()
success = False
for fom in fom_values.values():
for value in fom.values():
if "origin_type" in value and value["origin_type"] == "application":
success = True
success = success and criteria_list.passed()
status = self.get_status()
if status == ExperimentStatus.SUCCESS and not success:
status = ExperimentStatus.FAILED
elif success:
status = ExperimentStatus.SUCCESS
# When workflow_manager is present, only use app_status when workflow is completed or
# unresolved.
if self.workflow_manager is not None:
wm_status = self.workflow_manager.get_status(workspace)
if not (
wm_status is None
or wm_status in [ExperimentStatus.COMPLETE, ExperimentStatus.UNRESOLVED]
):
status = wm_status
self.set_status(status)
self._init_result()
for context, fom_map in fom_values.items():
context_map = {
"name": context,
"foms": [],
"display_name": _get_context_display_name(context),
}
for fom_name, fom in fom_map.items():
fom_copy = fom.copy()
fom_copy["name"] = fom_name
context_map["foms"].append(fom_copy)
if context == _NULL_CONTEXT:
self.result.contexts.insert(0, context_map)
else:
self.result.contexts.append(context_map)