def _analyze_experiments()

in lib/ramble/ramble/application.py [0:0]


    def _analyze_experiments(self, workspace, app_inst=None):
        """Perform experiment analysis.

        This method will build up the fom_values dictionary. Its structure is:

        fom_values[context][fom]

        A fom can show up in any number of explicit contexts (including zero).
        If the number of explicit contexts is zero, the fom is associated with
        the default '(null)' context.

        Success is determined at analysis time as well. This happens by checking if:
         - At least one FOM is extracted
         AND
         - Any defined success criteria pass

        Success criteria are defined within the application.py, but can also be
        injected in a workspace config.
        """

        if self.get_status() == ExperimentStatus.UNKNOWN and not workspace.dry_run:
            logger.warn(f"Experiment has status {self.get_status()}. Skipping analysis..\n")
            return

        def format_context(context_match, context_format):

            context_val = {}
            if isinstance(context_format, str):
                for group in string.Formatter().parse(context_format):
                    if group[1]:
                        context_val[group[1]] = context_match[group[1]]

            context_string = context_format.format(**context_val)
            return context_string

        fom_values = {}

        criteria_list = workspace.success_list
        criteria_list.reset()

        files, contexts, foms = self._analysis_dicts(criteria_list)

        exp_lock = self.experiment_lock()

        # Iterate over files. We already know they exist
        with lk.ReadTransaction(exp_lock):
            for file, file_conf in files.items():

                # Start with no active contexts in a file.
                active_contexts = {}
                logger.debug(f"Reading log file: {file}")

                if not os.path.exists(file):
                    logger.debug(f"Skipping analysis of non-existent file: {file}")
                    continue

                per_file_crit_objs = [
                    criteria_list.find_criteria(c) for c in file_conf["success_criteria"]
                ]

                with open(file) as f:
                    for line in f.readlines():
                        new_per_file_crit_objs = []
                        for crit_obj in per_file_crit_objs:
                            if crit_obj.passed(line, self):
                                crit_obj.mark_found()
                            elif crit_obj.anti_matched(line):
                                crit_obj.mark_anti_found()
                            else:
                                new_per_file_crit_objs.append(crit_obj)
                        per_file_crit_objs = new_per_file_crit_objs

                        for context in file_conf["contexts"]:
                            context_conf = contexts[context]
                            context_match = context_conf["regex"].match(line)

                            if context_match:
                                context_name = format_context(
                                    context_match, context_conf["format"]
                                )
                                logger.debug("Line was: %s" % line)
                                logger.debug(f" Context match {context} -- {context_name}")

                                active_contexts[context] = context_name

                                if context_name not in fom_values:
                                    fom_values[context_name] = {}

                        for fom in file_conf["foms"]:
                            fom_conf = foms[fom]
                            fom_match = fom_conf["regex"].match(line)

                            if fom_match:
                                fom_vars = {}
                                for k, v in fom_match.groupdict().items():
                                    fom_vars[k] = v
                                if fom_conf["fom_name_expanded"] is not None:
                                    fom_name = fom_conf["fom_name_expanded"]
                                else:
                                    fom_name = self.expander.expand_var(fom, extra_vars=fom_vars)

                                if fom_conf["group"] in fom_conf["regex"].groupindex:
                                    logger.debug(" --- Matched fom %s" % fom_name)
                                    fom_contexts = []
                                    if fom_conf["contexts"]:
                                        for context in fom_conf["contexts"]:
                                            context_name = (
                                                active_contexts[context]
                                                if context in active_contexts
                                                else _NULL_CONTEXT
                                            )
                                            fom_contexts.append(context_name)
                                    else:
                                        fom_contexts.append(_NULL_CONTEXT)

                                for context in fom_contexts:
                                    if context not in fom_values:
                                        fom_values[context] = {}
                                    fom_val = fom_match.group(fom_conf["group"])
                                    if fom_conf["units_expanded"] is not None:
                                        fom_unit = fom_conf["units"]
                                    else:
                                        fom_unit = self.expander.expand_var(
                                            fom_conf["units"], extra_vars=fom_vars
                                        )
                                    fom_values[context][fom_name] = {
                                        "value": fom_val,
                                        "units": fom_unit,
                                        "origin": fom_conf["origin"],
                                        "origin_type": fom_conf["origin_type"],
                                        "fom_type": fom_conf["fom_type"],
                                    }

        # Test all non-file based success criteria
        for criteria_obj in criteria_list.all_criteria():
            if criteria_obj.file is None:
                if criteria_obj.passed(app_inst=self, fom_values=fom_values):
                    criteria_obj.mark_found()

        success = False
        for fom in fom_values.values():
            for value in fom.values():
                if "origin_type" in value and value["origin_type"] == "application":
                    success = True
        success = success and criteria_list.passed()

        status = self.get_status()
        if status == ExperimentStatus.SUCCESS and not success:
            status = ExperimentStatus.FAILED
        elif success:
            status = ExperimentStatus.SUCCESS

        # When workflow_manager is present, only use app_status when workflow is completed or
        # unresolved.
        if self.workflow_manager is not None:
            wm_status = self.workflow_manager.get_status(workspace)
            if not (
                wm_status is None
                or wm_status in [ExperimentStatus.COMPLETE, ExperimentStatus.UNRESOLVED]
            ):
                status = wm_status

        self.set_status(status)

        self._init_result()

        for context, fom_map in fom_values.items():
            context_map = {
                "name": context,
                "foms": [],
                "display_name": _get_context_display_name(context),
            }

            for fom_name, fom in fom_map.items():
                fom_copy = fom.copy()
                fom_copy["name"] = fom_name
                context_map["foms"].append(fom_copy)

            if context == _NULL_CONTEXT:
                self.result.contexts.insert(0, context_map)
            else:
                self.result.contexts.append(context_map)