def _analysis_dicts()

in lib/ramble/ramble/application.py [0:0]


    def _analysis_dicts(self, criteria_list):
        """Extract files that need to be analyzed.

        Process figures_of_merit, and return the manipulated dictionaries
        to allow them to be extracted.

        Additionally, ensure the success criteria list is complete.

        Returns:
            files (dict): All files that need to be processed
            contexts (dict): Any contexts that have been defined
            foms (dict): All figures of merit that need to be extracted
        """

        files = {}
        contexts = {}
        foms = {}

        # Add the application defined criteria
        criteria_list.flush_scope("application_definition")

        success_lists = [
            ("application_definition", self.success_criteria),
        ]

        logger.debug(f" Number of modifiers are: {len(self._modifier_instances)}")
        if self._modifier_instances:
            criteria_list.flush_scope("modifier_definition")
        for mod in self._modifier_instances:
            success_lists.append(("modifier_definition", mod.success_criteria))

        for success_scope, success_list in success_lists:
            for criteria, conf in success_list.items():
                if conf["mode"] == "string":
                    match = (
                        self.expander.expand_var(conf["match"])
                        if conf["match"] is not None
                        else None
                    )
                    anti_match = (
                        self.expander.expand_var(conf["anti_match"])
                        if conf["anti_match"] is not None
                        else None
                    )
                    criteria_list.add_criteria(
                        success_scope,
                        criteria,
                        mode=conf["mode"],
                        match=match,
                        file=conf["file"],
                        anti_match=anti_match,
                    )
                elif conf["mode"] == "fom_comparison":
                    criteria_list.add_criteria(
                        success_scope,
                        criteria,
                        conf["mode"],
                        fom_name=conf["fom_name"],
                        fom_context=conf["fom_context"],
                        formula=conf["formula"],
                    )

        criteria_list.add_criteria(
            scope="application_definition",
            name="_application_function",
            mode="application_function",
        )

        # Extract file paths for all criteria
        for criteria in criteria_list.all_criteria():
            log_path = self.expander.expand_var(criteria.file)

            # Ensure log path is absolute. If not, prepend the experiment run directory
            if not os.path.isabs(log_path) and self.expander.experiment_run_dir not in log_path:
                log_path = os.path.join(self.expander.experiment_run_dir, log_path)

            if log_path not in files and os.path.exists(log_path):
                files[log_path] = self._new_file_dict()

            if log_path in files:
                files[log_path]["success_criteria"].append(criteria.name)

        # Remap fom / context / file data
        # Could push this into the language features in the future
        fom_definitions = self.figures_of_merit.copy()
        for fom, fom_def in fom_definitions.items():
            fom_def["origin"] = self.name
            fom_def["origin_type"] = "application"

        fom_contexts = self.figure_of_merit_contexts.copy()
        for mod in self._modifier_instances:
            fom_contexts.update(mod.figure_of_merit_contexts)

            mod_vars = mod.modded_variables(self)

            for fom, fom_def in mod.figures_of_merit.items():
                fom_definitions[fom] = {"origin": f"{mod}", "origin_type": "modifier"}
                for attr in fom_def.keys():
                    if isinstance(fom_def[attr], (list, FomType)):
                        fom_definitions[fom][attr] = fom_def[attr].copy()
                    else:
                        fom_definitions[fom][attr] = self.expander.expand_var(
                            fom_def[attr], mod_vars
                        )

        if self.workflow_manager is not None:
            fom_contexts.update(self.workflow_manager.figure_of_merit_contexts)
            for fom, fom_def in self.workflow_manager.figures_of_merit.items():
                fom_definitions[fom] = {
                    "origin": f"{self.workflow_manager}",
                    "origin_type": "workflow_manager",
                }
                for attr in fom_def.keys():
                    if isinstance(fom_def[attr], (list, FomType)):
                        fom_definitions[fom][attr] = fom_def[attr].copy()
                    else:
                        fom_definitions[fom][attr] = self.expander.expand_var(fom_def[attr])

        for fom, conf in fom_definitions.items():
            log_path = self.expander.expand_var(conf["log_file"])

            # Ensure log path is absolute. If not, prepend the experiment run directory
            if not os.path.isabs(log_path) and self.expander.experiment_run_dir not in log_path:
                log_path = os.path.join(self.expander.experiment_run_dir, log_path)

            if log_path not in files:
                files[log_path] = self._new_file_dict()

            logger.debug("Log = %s" % log_path)
            logger.debug("Conf = %s" % conf)
            if conf["contexts"]:
                files[log_path]["contexts"].extend(conf["contexts"])
            files[log_path]["foms"].append(fom)

            def _try_expand_var_or_none(var: str, expander):
                try:
                    return expander.expand_var(var, allow_passthrough=False)
                except ramble.expander.RambleSyntaxError:
                    return None

            foms[fom] = {
                "regex": re.compile(r"%s" % self.expander.expand_var(conf["regex"])),
                "contexts": [],
                "group": conf["group_name"],
                "units": conf["units"],
                "origin": conf["origin"],
                "origin_type": conf["origin_type"],
                "fom_type": conf["fom_type"].to_dict(),
                # If expansion works (i.e., it doesn't rely on the matched fom groups),
                # then cache it here to avoid repeated expansion later.
                "units_expanded": _try_expand_var_or_none(conf["units"], self.expander),
                "fom_name_expanded": _try_expand_var_or_none(fom, self.expander),
            }
            if conf["contexts"]:
                foms[fom]["contexts"].extend(conf["contexts"])
                for context in conf["contexts"]:
                    regex_str = self.expander.expand_var(fom_contexts[context]["regex"])
                    format_str = fom_contexts[context]["output_format"]
                    contexts[context] = {
                        "regex": re.compile(r"%s" % regex_str),
                        "format": format_str,
                    }

        return files, contexts, foms