in lib/ramble/ramble/application.py [0:0]
def _analysis_dicts(self, criteria_list):
"""Extract files that need to be analyzed.
Process figures_of_merit, and return the manipulated dictionaries
to allow them to be extracted.
Additionally, ensure the success criteria list is complete.
Returns:
files (dict): All files that need to be processed
contexts (dict): Any contexts that have been defined
foms (dict): All figures of merit that need to be extracted
"""
files = {}
contexts = {}
foms = {}
# Add the application defined criteria
criteria_list.flush_scope("application_definition")
success_lists = [
("application_definition", self.success_criteria),
]
logger.debug(f" Number of modifiers are: {len(self._modifier_instances)}")
if self._modifier_instances:
criteria_list.flush_scope("modifier_definition")
for mod in self._modifier_instances:
success_lists.append(("modifier_definition", mod.success_criteria))
for success_scope, success_list in success_lists:
for criteria, conf in success_list.items():
if conf["mode"] == "string":
match = (
self.expander.expand_var(conf["match"])
if conf["match"] is not None
else None
)
anti_match = (
self.expander.expand_var(conf["anti_match"])
if conf["anti_match"] is not None
else None
)
criteria_list.add_criteria(
success_scope,
criteria,
mode=conf["mode"],
match=match,
file=conf["file"],
anti_match=anti_match,
)
elif conf["mode"] == "fom_comparison":
criteria_list.add_criteria(
success_scope,
criteria,
conf["mode"],
fom_name=conf["fom_name"],
fom_context=conf["fom_context"],
formula=conf["formula"],
)
criteria_list.add_criteria(
scope="application_definition",
name="_application_function",
mode="application_function",
)
# Extract file paths for all criteria
for criteria in criteria_list.all_criteria():
log_path = self.expander.expand_var(criteria.file)
# Ensure log path is absolute. If not, prepend the experiment run directory
if not os.path.isabs(log_path) and self.expander.experiment_run_dir not in log_path:
log_path = os.path.join(self.expander.experiment_run_dir, log_path)
if log_path not in files and os.path.exists(log_path):
files[log_path] = self._new_file_dict()
if log_path in files:
files[log_path]["success_criteria"].append(criteria.name)
# Remap fom / context / file data
# Could push this into the language features in the future
fom_definitions = self.figures_of_merit.copy()
for fom, fom_def in fom_definitions.items():
fom_def["origin"] = self.name
fom_def["origin_type"] = "application"
fom_contexts = self.figure_of_merit_contexts.copy()
for mod in self._modifier_instances:
fom_contexts.update(mod.figure_of_merit_contexts)
mod_vars = mod.modded_variables(self)
for fom, fom_def in mod.figures_of_merit.items():
fom_definitions[fom] = {"origin": f"{mod}", "origin_type": "modifier"}
for attr in fom_def.keys():
if isinstance(fom_def[attr], (list, FomType)):
fom_definitions[fom][attr] = fom_def[attr].copy()
else:
fom_definitions[fom][attr] = self.expander.expand_var(
fom_def[attr], mod_vars
)
if self.workflow_manager is not None:
fom_contexts.update(self.workflow_manager.figure_of_merit_contexts)
for fom, fom_def in self.workflow_manager.figures_of_merit.items():
fom_definitions[fom] = {
"origin": f"{self.workflow_manager}",
"origin_type": "workflow_manager",
}
for attr in fom_def.keys():
if isinstance(fom_def[attr], (list, FomType)):
fom_definitions[fom][attr] = fom_def[attr].copy()
else:
fom_definitions[fom][attr] = self.expander.expand_var(fom_def[attr])
for fom, conf in fom_definitions.items():
log_path = self.expander.expand_var(conf["log_file"])
# Ensure log path is absolute. If not, prepend the experiment run directory
if not os.path.isabs(log_path) and self.expander.experiment_run_dir not in log_path:
log_path = os.path.join(self.expander.experiment_run_dir, log_path)
if log_path not in files:
files[log_path] = self._new_file_dict()
logger.debug("Log = %s" % log_path)
logger.debug("Conf = %s" % conf)
if conf["contexts"]:
files[log_path]["contexts"].extend(conf["contexts"])
files[log_path]["foms"].append(fom)
def _try_expand_var_or_none(var: str, expander):
try:
return expander.expand_var(var, allow_passthrough=False)
except ramble.expander.RambleSyntaxError:
return None
foms[fom] = {
"regex": re.compile(r"%s" % self.expander.expand_var(conf["regex"])),
"contexts": [],
"group": conf["group_name"],
"units": conf["units"],
"origin": conf["origin"],
"origin_type": conf["origin_type"],
"fom_type": conf["fom_type"].to_dict(),
# If expansion works (i.e., it doesn't rely on the matched fom groups),
# then cache it here to avoid repeated expansion later.
"units_expanded": _try_expand_var_or_none(conf["units"], self.expander),
"fom_name_expanded": _try_expand_var_or_none(fom, self.expander),
}
if conf["contexts"]:
foms[fom]["contexts"].extend(conf["contexts"])
for context in conf["contexts"]:
regex_str = self.expander.expand_var(fom_contexts[context]["regex"])
format_str = fom_contexts[context]["output_format"]
contexts[context] = {
"regex": re.compile(r"%s" % regex_str),
"format": format_str,
}
return files, contexts, foms