def annotate_batch()

in assets/model_monitoring/components/src/generation_safety_quality/annotation_compute_histogram/run.py [0:0]


    def annotate_batch(iterator):
        for batch in iterator:
            # add environment variables on executors
            for env_var_key, env_var_value in driver_env_vars.items():
                os.environ[env_var_key] = env_var_value
            rows = []
            input_columns = [PROMPT, COMPLETION]
            if has_context:
                input_columns.append(CONTEXT)
            if has_ground_truth:
                input_columns.append(GROUND_TRUTH)
            passthrough_cols = get_passthrough_cols(batch)
            for index, row in batch.iterrows():
                qca = {QUERY: row[PROMPT], RESPONSE: row[COMPLETION]}
                if has_context:
                    qca[CONTEXT] = row[CONTEXT]
                if has_ground_truth:
                    qca[GROUND_TRUTH] = row[GROUND_TRUTH]
                rows.append(qca)

            if not os.path.exists(DEFAULT_PROMPTFLOW_PATH):
                os.makedirs(DEFAULT_PROMPTFLOW_PATH, exist_ok=True)
            mlflow.set_tracking_uri(tracking_uri)

            input_dir = tempfile.TemporaryDirectory()
            evaluation_name = "gsq-evaluation"
            run_name = evaluation_name + "-child-run"
            model_config = AzureOpenAIModelConfiguration(
                azure_endpoint=api_base,
                api_key=api_key,
                azure_deployment=model_deployment_name,
                api_version=api_version,
            )
            evaluators = {}
            evaluator_config = {}
            for metric_name in metrics_list:
                evaluator = EVALUATOR_NAME_TO_CLASS[metric_name]
                metric_name_compact = COLUMN_TO_COMPACT_METRIC_NAME[
                    metric_name].lower()
                evaluators[metric_name_compact] = evaluator(
                    model_config=model_config)
                evaluator_config[metric_name_compact] = {
                    COLUMN_MAPPING: {
                        RESPONSE: format_data_column(RESPONSE),
                        QUERY: format_data_column(QUERY)
                    }
                }
                config = evaluator_config[metric_name_compact]
                if has_context:
                    config[COLUMN_MAPPING][CONTEXT] = format_data_column(CONTEXT)
                if has_ground_truth:
                    config[COLUMN_MAPPING][GROUND_TRUTH] = format_data_column(GROUND_TRUTH)
            # write rows to jsonl file
            input_file_name = "eval_input_" + str(uuid.uuid4()) + ".jsonl"
            input_file_path = os.path.join(input_dir.name, input_file_name)
            with open(input_file_path, "w") as f:
                for row in rows:
                    f.write(json.dumps(row) + '\n')
            # get existing run
            with mlflow.start_run():
                # create child run
                with mlflow.start_run(nested=mlflow.active_run(), run_name=run_name) as run:
                    tabular_result = evaluate(
                        evaluation_name=evaluation_name,
                        data=input_file_path,
                        evaluators=evaluators,
                        evaluator_config=evaluator_config
                    )
                    child_run_id = run.info.run_id
                    # add promptflow debug logs to run
                    hostname = socket.gethostname()
                    artifact_path = f"worker_promptflow/{hostname}"
                    client = mlflow.tracking.MlflowClient()
                    client.log_artifacts(
                        child_run_id, DEFAULT_PROMPTFLOW_PATH, artifact_path=artifact_path)
            # convert rows from result to pandas dataframe
            tabular_result = pd.DataFrame(tabular_result["rows"])
            for passthrough_column, passthrough_values in passthrough_cols.items():
                tabular_result[passthrough_column] = passthrough_values
            # rename metric columns
            try:
                for column_name in metrics_list:
                    # set failures to -1
                    metric_name_compact = COLUMN_TO_COMPACT_METRIC_NAME[column_name]
                    # output column names follow schema like "outputs.coherence.gpt_coherence"
                    result_name = ("outputs." + metric_name_compact + "." + column_name).lower()
                    tabular_result[result_name] = pd.to_numeric(tabular_result[result_name], errors='coerce')
                    tabular_result[result_name].fillna(-1, inplace=True)
                    tabular_result.rename(columns={result_name: metric_name_compact}, inplace=True)
                for column_name in input_columns:
                    # input column names follow schema like "inputs.context"
                    if column_name == PROMPT:
                        result_name = "inputs." + QUERY
                    elif column_name == COMPLETION:
                        result_name = "inputs." + RESPONSE
                    else:
                        result_name = "inputs." + column_name
                    tabular_result.rename(columns={result_name: column_name}, inplace=True)
            except KeyError as e:
                # raise new user error with more context
                raise InvalidInputError(
                    f"Unable to retrieve and rename {column_name}. "
                    f"Usually this indicates an invalid configuration or connection. "
                    f"Please check the model_deployment_name and workspace_connection_arm_id. "
                    f"For more detailed error information please see the promptflow "
                    f"debug folder in the child run with run id {child_run_id}."
                ) from e
            yield tabular_result