in assets/training/model_evaluation/src/model_prediction.py [0:0]
def predict(self, data):
"""Predict.
Args:
data (_type_): _description_
Returns:
_type_: _description_
"""
predictions, pred_probas, y_test, performance = pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
enable_token_counts = self.config.get("token_count_per_sample", False)
enable_character_counts = self.config.get("char_count_per_sample", False)
tokenizer = self.load_tokenizer(enable_token_counts)
pipeline_params = filter_pipeline_params(self.config, self.predictor.model_flavor, self.predictor)
try:
for idx, (X_test, y_test_batch) in enumerate(data):
logger.info("batch: " + str(idx))
if len(X_test) == 0:
logger.info("No samples in batch. Skipping.")
continue
y_transformer = None
pred_probas_batch = None
def add_to_predict_params_if_applicable(dict_to_add, predict_params):
if self.predictor.model_flavor != constants.MODEL_FLAVOR.TRANSFORMERS:
predict_params = {**predict_params, **dict_to_add}
return predict_params
predict_params = add_to_predict_params_if_applicable(
{"y_transformer": y_transformer, "multilabel": self.multilabel}, {})
if self.task == constants.TASK.TRANSLATION:
source_lang = self.config.get("source_lang", None)
target_lang = self.config.get("target_lang", None)
start_ms = time.time() * 1000
predict_params = add_to_predict_params_if_applicable(
{"source_lang": source_lang, "target_lang": target_lang}, predict_params)
predictions_batch = self.predictor.predict(X_test, **predict_params)
end_ms = time.time() * 1000
latency_ms = end_ms - start_ms
elif self.task == constants.TASK.CHAT_COMPLETION:
start_ms = time.time() * 1000
predictions_batch = self.predictor.predict(X_test, self.input_column_names,
y_transformer=y_transformer,
multilabel=self.multilabel,
masks_required=self.masks_required,
**pipeline_params)
end_ms = time.time() * 1000
latency_ms = end_ms - start_ms
else:
# batching is handled in mlflow predict for image tasks.
if self.task in constants.IMAGE_TASKS:
pipeline_params.update(self.config)
if self.batch_size:
pipeline_params.update({ArgumentLiterals.BATCH_SIZE: self.batch_size})
start_ms = time.time() * 1000
predict_params = add_to_predict_params_if_applicable(
{"masks_required": self.masks_required}, predict_params)
predictions_batch = self.predictor.predict(X_test, **{**predict_params, **pipeline_params})
end_ms = time.time() * 1000
latency_ms = end_ms - start_ms
if self.task in constants.CLASSIFICATION_SET:
pred_probas_batch = self.predictor.predict_proba(X_test, **{**predict_params, **pipeline_params})
if not isinstance(predictions_batch, pd.DataFrame):
if self.task == constants.TASK.CHAT_COMPLETION:
predictions_batch = pd.DataFrame(predictions_batch)
else:
predictions_df = pd.DataFrame()
predictions_df[constants.PREDICTIONS_COLUMN_NAME] = predictions_batch
predictions_batch = predictions_df
if pred_probas_batch is not None and not isinstance(pred_probas_batch, pd.DataFrame):
pred_probas_batch = pd.DataFrame(pred_probas_batch)
if y_test_batch is not None and self.task != constants.TASK.CHAT_COMPLETION:
cols = []
if self.extra_y_test_cols is not None:
cols += self.extra_y_test_cols
if self.label_column_name is not None:
cols += [self.label_column_name]
y_test_batch = pd.DataFrame(y_test_batch, index=X_test.index, columns=cols)
# Below code won't work with extra_cols
if self.label_column_name is not None \
and isinstance(y_test_batch[self.label_column_name].iloc[0], str) \
and self.task in constants.MULTIPLE_OUTPUTS_SET:
y_test_batch[self.label_column_name] = y_test_batch[self.label_column_name].apply(
lambda x: ast.literal_eval(x)
)
if self.task == constants.TASK.QnA:
for col in X_test.columns:
y_test_batch[col] = X_test[col]
elif self.task == constants.TASK.CHAT_COMPLETION:
logger.info("Empty/NaN ground truths will replaced with Empty string values ('').")
if self.label_column_name is not None:
y_test_batch = pd.DataFrame(y_test_batch, columns=[self.label_column_name]).fillna("")
else:
logger.info("No label column found. Trying to parse test data for ground truths.")
y_test_batch = pd.DataFrame(X_test[self.input_column_names[0]].apply(
lambda x: x[-1]['content'] if x[-1]['role'] == 'assistant' else ""))
else:
y_test_batch = pd.DataFrame({})
if self.task != constants.TASK.CHAT_COMPLETION:
predictions_batch.index = X_test.index
if pred_probas_batch is not None:
pred_probas_batch.index = X_test.index
else:
pred_probas_batch = pd.DataFrame({})
performance_batch = pd.DataFrame({})
performance_batch[constants.PerformanceColumns.BATCH_SIZE_COLUMN_NAME] = \
[len(predictions_batch) for _ in range(len(predictions_batch))]
start_time_iso_string = datetime.fromtimestamp(start_ms / 1000, timezone.utc).isoformat()
end_time_iso_string = datetime.fromtimestamp(end_ms / 1000, timezone.utc).isoformat()
performance_batch[constants.PerformanceColumns.START_TIME_COLUMN_NAME] = \
[start_time_iso_string for _ in range(len(predictions_batch))]
performance_batch[constants.PerformanceColumns.END_TIME_COLUMN_NAME] = \
[end_time_iso_string for _ in range(len(predictions_batch))]
performance_batch[constants.PerformanceColumns.LATENCY_COLUMN_NAME] = \
[latency_ms for _ in range(len(predictions_batch))]
if self.task in constants.TEXT_TOKEN_TASKS and enable_character_counts:
char_time_start = time.time()
if self.task in constants.TEXT_OUTPUT_TOKEN_TASKS:
performance_batch[constants.PerformanceColumns.OUTPUT_CHARACTERS_COLUMN_NAME] = \
[len(pred) for pred in predictions_batch[predictions_batch.columns.values[0]]]
if self.task == constants.TASK.QnA:
performance_batch[constants.PerformanceColumns.INPUT_CHARACTERS_COLUMN_NAME] = \
[len(q) + len(a) for q, a in zip(X_test[X_test.columns.values[0]],
X_test[X_test.columns.values[1]])]
else:
performance_batch[constants.PerformanceColumns.INPUT_CHARACTERS_COLUMN_NAME] = \
[len(inp) for inp in X_test[X_test.columns.values[0]]]
logger.info(f"Character count time ms: {(time.time() - char_time_start) * 1000}")
elif self.task not in constants.TEXT_TOKEN_TASKS and enable_character_counts:
logger.warning("Character counts not supported for this task type; "
"no character counts will be returned.")
if tokenizer is not None:
token_time_start = time.time()
if self.task in constants.TEXT_OUTPUT_TOKEN_TASKS:
curr_predictions = list(predictions_batch[predictions_batch.columns.values[0]])
tokenized_predictions = tokenizer(curr_predictions)["input_ids"]
pred_num_tokens = [len(tokenized_input) for tokenized_input in tokenized_predictions]
performance_batch[constants.PerformanceColumns.OUTPUT_TOKENS_COLUMN_NAME] = pred_num_tokens
if self.task == constants.TASK.QnA:
tokenized_inputs = tokenizer(
list(X_test[X_test.columns.values[0]]), list(X_test[X_test.columns.values[1]])
)["input_ids"]
elif self.task == constants.TASK.CHAT_COMPLETION:
input_data = X_test[X_test.columns.values[0]]
tokenized_inputs = [
list(chain(*tokenizer([chat['content'] for chat in conversation])["input_ids"]))
for conversation in input_data
]
else:
tokenized_inputs = tokenizer(list(X_test[X_test.columns.values[0]]))["input_ids"]
input_num_tokens = [len(tokenized_input) for tokenized_input in tokenized_inputs]
performance_batch[constants.PerformanceColumns.INPUT_TOKENS_COLUMN_NAME] = input_num_tokens
logger.info(f"Token count time ms: {(time.time() - token_time_start) * 1000}")
predictions = pd.concat([predictions, predictions_batch], axis=0)
pred_probas = pd.concat([pred_probas, pred_probas_batch], axis=0)
y_test = pd.concat([y_test, y_test_batch], axis=0)
performance = pd.concat([performance, performance_batch], axis=0)
if self.task in [constants.TASK.IMAGE_OBJECT_DETECTION, constants.TASK.IMAGE_INSTANCE_SEGMENTATION]:
y_test["image_meta_info"] = X_test["image_meta_info"]
logger.info(f"Latency (ms) for this batch: {latency_ms}")
except ValueError as e:
exception = get_azureml_exception(PredictException, ModelPredictionUserError, e, error=repr(e))
log_traceback(exception, logger)
raise exception
except Exception as e:
exception = get_azureml_exception(PredictException, ModelPredictionInternalError, e,
wrap_azureml_ex=False, error=repr(e))
if isinstance(e, (IndexError, RuntimeError)):
log_traceback(exception, logger, constants.ErrorStrings.TorchErrorMessage)
else:
log_traceback(exception, logger)
raise exception
if self.batch_size is not None and len(predictions) == 0:
exception = get_azureml_exception(DataValidationException, EmptyInputData, None)
log_traceback(exception, logger)
raise exception
return predictions, pred_probas, y_test, performance