def prepare_data()

in lib/ramble/ramble/reports.py [0:0]


def prepare_data(results: dict, where_query) -> pd.DataFrame:
    """Creates a Pandas DataFrame from the results dictionary to use for reports.

    Transforms nested results dictionary into a flat dataframe. Each row equals
    one FOM from one context of one experiment, with columns including
    associated experiment variables (except paths and commands).
    """

    unnest_context = []
    skip_exps = []
    # first unnest dictionaries
    for exp in results["experiments"]:
        if exp["name"] in skip_exps or is_repeat_child(exp):
            logger.debug(f"Skipping import of experiment {exp['name']}")
            continue

        elif exp["RAMBLE_STATUS"] != "SUCCESS":
            continue
        else:
            logger.debug(f"Importing experiment {exp['name']}")
            # For repeat experiments, use summary stats from base exp and skip repeats
            # Repeats are sequenced after base exp

            if exp.get("N_REPEATS", 0) > 0:
                # Generate repeat experiment names in order to skip them explicitly
                exp_name = exp["name"]
                for n in range(1, exp["N_REPEATS"] + 1):
                    if ".chain" in exp_name:
                        insert_idx = exp_name.index(".chain")
                        repeat_exp_name = exp_name[:insert_idx] + f".{n}" + exp_name[insert_idx:]
                        skip_exps.append(repeat_exp_name)
                    else:
                        skip_exps.append(exp_name + f".{n}")

            for context in exp["CONTEXTS"]:
                for fom in context["foms"]:
                    # Expand to one row/FOM/context w/ a copy of the experiment vars and metadata
                    exp_copy = copy.deepcopy(exp)

                    # Remove context dict and add the current FOM values
                    exp_copy.pop("CONTEXTS")
                    exp_copy[ReportVars.CONTEXT.value] = context["name"]
                    for name, val in fom.items():
                        if name in _FOM_DICT_MAPPING.keys():
                            exp_copy[_FOM_DICT_MAPPING[name]] = val
                        elif name == "fom_type":
                            exp_copy["fom_type"] = FomType.from_str(fom["fom_type"]["name"])
                            exp_copy[ReportVars.BETTER_DIRECTION.value] = BetterDirection.from_str(
                                fom["fom_type"][ReportVars.BETTER_DIRECTION.value]
                            )

                        # older data exports may not have fom_type stored
                        if "fom_type" not in exp_copy:
                            exp_copy["fom_type"] = FomType.UNDEFINED
                            exp_copy[ReportVars.BETTER_DIRECTION.value] = (
                                BetterDirection.INDETERMINATE
                            )

                    # Exclude vars that aren't needed for analysis, mainly paths and commands
                    dir_regex = r"_dir$"
                    path_regex = r"_path$"
                    vars_to_ignore = [
                        keywords.batch_submit,
                        keywords.log_file,
                        "command",
                        "execute_experiment",
                    ]
                    for key, value in exp["RAMBLE_VARIABLES"].items():
                        if key in vars_to_ignore:
                            continue
                        if re.search(dir_regex, key):
                            continue
                        if re.search(path_regex, key):
                            continue
                        exp_copy[key] = value

                    for key, value in exp["RAMBLE_RAW_VARIABLES"].items():
                        if key in vars_to_ignore:
                            continue
                        if re.search(dir_regex, key):
                            continue
                        if re.search(path_regex, key):
                            continue
                        exp_copy["RAW" + key] = value

                    unnest_context.append(exp_copy)

    results_df = pd.DataFrame.from_dict(unnest_context)

    # Apply where to down select
    if where_query:
        logger.info(f"Applying where query: {where_query}")
        results_df = results_df.query(where_query)

    return results_df