def _assign_flow_values()

in scripts/promptflow-ci/utils/flow_utils.py [0:0]


def _assign_flow_values(flow_dirs):
    """Assign the flow values and update flow.dag.yaml."""
    log_debug("\n=======Start overriding values for flows=======")
    for flow_dir in flow_dirs:
        with open(Path(flow_dir) / "flow.dag.yaml", "r") as dag_file:
            flow_dag = yaml.safe_load(dag_file)
        flow_name = flow_dir.parents[0].name
        config_dir = Path(os.path.join(CONFIG_ROOT, flow_name))
        config_file = Path(os.path.join(config_dir, CONFIG_FILE))
        if not (config_dir.exists() and config_file.exists()):
            log_warning(
                f"No available flow values found for '{flow_name}'. Skip flow values assignment.")
        else:
            with open(config_file, "r") as fin:
                values = json.load(fin)
            # Override connection/inputs in nodes
            log_debug(f"Start overriding values for nodes for '{flow_dir}'.")
            if "nodes" in values:
                for override_node in values["nodes"]:
                    for flow_node in flow_dag["nodes"]:
                        if flow_node["name"] == override_node["name"]:
                            if "connection" in override_node:
                                # Override connection
                                flow_node["connection"] = override_node["connection"]
                            if "inputs" in override_node:
                                for input_name, input_value in override_node["inputs"].items():
                                    # Override input
                                    flow_node["inputs"][input_name] = input_value

            with open(flow_dir / "flow.dag.yaml", "w", encoding="utf-8") as dag_file:
                yaml.dump(flow_dag, dag_file, allow_unicode=True)
        test_data_file = Path(os.path.join(config_dir, TEST_DATA_FILE))
        if config_dir.exists() and test_data_file.exists():
            log_debug("Using the test_data.json file from the test-configs to run the flow.")
            shutil.copy(os.path.join(config_dir, TEST_DATA_FILE), flow_dir)
        else:
            with open(flow_dir/"test_data.json", 'w', encoding="utf-8") as data_file:
                datas = []
                data = {}
                for key, val in flow_dag["inputs"].items():
                    value = val.get("default")
                    if isinstance(value, list):
                        if not value:
                            value.append("default")
                    elif isinstance(value, str):
                        if value == "":
                            value = "default"
                    data[key] = value
                datas.append(data)
                json.dump(datas, data_file, indent=4)
    log_debug("=======Complete overriding values for flows=======\n")
    return flow_dirs