in assets/model_monitoring/components/src/generation_safety_quality/annotation_compute_histogram/run.py [0:0]
def annotate_batch(iterator):
for batch in iterator:
# add environment variables on executors
for env_var_key, env_var_value in driver_env_vars.items():
os.environ[env_var_key] = env_var_value
rows = []
input_columns = [PROMPT, COMPLETION]
if has_context:
input_columns.append(CONTEXT)
if has_ground_truth:
input_columns.append(GROUND_TRUTH)
passthrough_cols = get_passthrough_cols(batch)
for index, row in batch.iterrows():
qca = {QUERY: row[PROMPT], RESPONSE: row[COMPLETION]}
if has_context:
qca[CONTEXT] = row[CONTEXT]
if has_ground_truth:
qca[GROUND_TRUTH] = row[GROUND_TRUTH]
rows.append(qca)
if not os.path.exists(DEFAULT_PROMPTFLOW_PATH):
os.makedirs(DEFAULT_PROMPTFLOW_PATH, exist_ok=True)
mlflow.set_tracking_uri(tracking_uri)
input_dir = tempfile.TemporaryDirectory()
evaluation_name = "gsq-evaluation"
run_name = evaluation_name + "-child-run"
model_config = AzureOpenAIModelConfiguration(
azure_endpoint=api_base,
api_key=api_key,
azure_deployment=model_deployment_name,
api_version=api_version,
)
evaluators = {}
evaluator_config = {}
for metric_name in metrics_list:
evaluator = EVALUATOR_NAME_TO_CLASS[metric_name]
metric_name_compact = COLUMN_TO_COMPACT_METRIC_NAME[
metric_name].lower()
evaluators[metric_name_compact] = evaluator(
model_config=model_config)
evaluator_config[metric_name_compact] = {
COLUMN_MAPPING: {
RESPONSE: format_data_column(RESPONSE),
QUERY: format_data_column(QUERY)
}
}
config = evaluator_config[metric_name_compact]
if has_context:
config[COLUMN_MAPPING][CONTEXT] = format_data_column(CONTEXT)
if has_ground_truth:
config[COLUMN_MAPPING][GROUND_TRUTH] = format_data_column(GROUND_TRUTH)
# write rows to jsonl file
input_file_name = "eval_input_" + str(uuid.uuid4()) + ".jsonl"
input_file_path = os.path.join(input_dir.name, input_file_name)
with open(input_file_path, "w") as f:
for row in rows:
f.write(json.dumps(row) + '\n')
# get existing run
with mlflow.start_run():
# create child run
with mlflow.start_run(nested=mlflow.active_run(), run_name=run_name) as run:
tabular_result = evaluate(
evaluation_name=evaluation_name,
data=input_file_path,
evaluators=evaluators,
evaluator_config=evaluator_config
)
child_run_id = run.info.run_id
# add promptflow debug logs to run
hostname = socket.gethostname()
artifact_path = f"worker_promptflow/{hostname}"
client = mlflow.tracking.MlflowClient()
client.log_artifacts(
child_run_id, DEFAULT_PROMPTFLOW_PATH, artifact_path=artifact_path)
# convert rows from result to pandas dataframe
tabular_result = pd.DataFrame(tabular_result["rows"])
for passthrough_column, passthrough_values in passthrough_cols.items():
tabular_result[passthrough_column] = passthrough_values
# rename metric columns
try:
for column_name in metrics_list:
# set failures to -1
metric_name_compact = COLUMN_TO_COMPACT_METRIC_NAME[column_name]
# output column names follow schema like "outputs.coherence.gpt_coherence"
result_name = ("outputs." + metric_name_compact + "." + column_name).lower()
tabular_result[result_name] = pd.to_numeric(tabular_result[result_name], errors='coerce')
tabular_result[result_name].fillna(-1, inplace=True)
tabular_result.rename(columns={result_name: metric_name_compact}, inplace=True)
for column_name in input_columns:
# input column names follow schema like "inputs.context"
if column_name == PROMPT:
result_name = "inputs." + QUERY
elif column_name == COMPLETION:
result_name = "inputs." + RESPONSE
else:
result_name = "inputs." + column_name
tabular_result.rename(columns={result_name: column_name}, inplace=True)
except KeyError as e:
# raise new user error with more context
raise InvalidInputError(
f"Unable to retrieve and rename {column_name}. "
f"Usually this indicates an invalid configuration or connection. "
f"Please check the model_deployment_name and workspace_connection_arm_id. "
f"For more detailed error information please see the promptflow "
f"debug folder in the child run with run id {child_run_id}."
) from e
yield tabular_result