def predict()

in assets/training/model_evaluation/src/model_prediction.py [0:0]


    def predict(self, data):
        """Predict.

        Args:
            data (_type_): _description_

        Returns:
            _type_: _description_
        """
        predictions, pred_probas, y_test, performance = pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame()

        enable_token_counts = self.config.get("token_count_per_sample", False)
        enable_character_counts = self.config.get("char_count_per_sample", False)
        tokenizer = self.load_tokenizer(enable_token_counts)

        pipeline_params = filter_pipeline_params(self.config, self.predictor.model_flavor, self.predictor)

        try:
            for idx, (X_test, y_test_batch) in enumerate(data):
                logger.info("batch: " + str(idx))
                if len(X_test) == 0:
                    logger.info("No samples in batch. Skipping.")
                    continue

                y_transformer = None

                pred_probas_batch = None

                def add_to_predict_params_if_applicable(dict_to_add, predict_params):
                    if self.predictor.model_flavor != constants.MODEL_FLAVOR.TRANSFORMERS:
                        predict_params = {**predict_params, **dict_to_add}
                    return predict_params

                predict_params = add_to_predict_params_if_applicable(
                    {"y_transformer": y_transformer, "multilabel": self.multilabel}, {})
                if self.task == constants.TASK.TRANSLATION:
                    source_lang = self.config.get("source_lang", None)
                    target_lang = self.config.get("target_lang", None)
                    start_ms = time.time() * 1000
                    predict_params = add_to_predict_params_if_applicable(
                        {"source_lang": source_lang, "target_lang": target_lang}, predict_params)
                    predictions_batch = self.predictor.predict(X_test, **predict_params)
                    end_ms = time.time() * 1000
                    latency_ms = end_ms - start_ms
                elif self.task == constants.TASK.CHAT_COMPLETION:
                    start_ms = time.time() * 1000
                    predictions_batch = self.predictor.predict(X_test, self.input_column_names,
                                                               y_transformer=y_transformer,
                                                               multilabel=self.multilabel,
                                                               masks_required=self.masks_required,
                                                               **pipeline_params)
                    end_ms = time.time() * 1000
                    latency_ms = end_ms - start_ms
                else:
                    # batching is handled in mlflow predict for image tasks.
                    if self.task in constants.IMAGE_TASKS:
                        pipeline_params.update(self.config)
                        if self.batch_size:
                            pipeline_params.update({ArgumentLiterals.BATCH_SIZE: self.batch_size})
                    start_ms = time.time() * 1000
                    predict_params = add_to_predict_params_if_applicable(
                        {"masks_required": self.masks_required}, predict_params)

                    predictions_batch = self.predictor.predict(X_test, **{**predict_params, **pipeline_params})
                    end_ms = time.time() * 1000
                    latency_ms = end_ms - start_ms
                if self.task in constants.CLASSIFICATION_SET:
                    pred_probas_batch = self.predictor.predict_proba(X_test, **{**predict_params, **pipeline_params})

                if not isinstance(predictions_batch, pd.DataFrame):
                    if self.task == constants.TASK.CHAT_COMPLETION:
                        predictions_batch = pd.DataFrame(predictions_batch)
                    else:
                        predictions_df = pd.DataFrame()
                        predictions_df[constants.PREDICTIONS_COLUMN_NAME] = predictions_batch
                        predictions_batch = predictions_df
                if pred_probas_batch is not None and not isinstance(pred_probas_batch, pd.DataFrame):
                    pred_probas_batch = pd.DataFrame(pred_probas_batch)
                if y_test_batch is not None and self.task != constants.TASK.CHAT_COMPLETION:
                    cols = []
                    if self.extra_y_test_cols is not None:
                        cols += self.extra_y_test_cols
                    if self.label_column_name is not None:
                        cols += [self.label_column_name]
                    y_test_batch = pd.DataFrame(y_test_batch, index=X_test.index, columns=cols)
                    # Below code won't work with extra_cols
                    if self.label_column_name is not None \
                            and isinstance(y_test_batch[self.label_column_name].iloc[0], str) \
                            and self.task in constants.MULTIPLE_OUTPUTS_SET:
                        y_test_batch[self.label_column_name] = y_test_batch[self.label_column_name].apply(
                            lambda x: ast.literal_eval(x)
                        )
                    if self.task == constants.TASK.QnA:
                        for col in X_test.columns:
                            y_test_batch[col] = X_test[col]
                elif self.task == constants.TASK.CHAT_COMPLETION:
                    logger.info("Empty/NaN ground truths will replaced with Empty string values ('').")
                    if self.label_column_name is not None:
                        y_test_batch = pd.DataFrame(y_test_batch, columns=[self.label_column_name]).fillna("")
                    else:
                        logger.info("No label column found. Trying to parse test data for ground truths.")
                        y_test_batch = pd.DataFrame(X_test[self.input_column_names[0]].apply(
                            lambda x: x[-1]['content'] if x[-1]['role'] == 'assistant' else ""))
                else:
                    y_test_batch = pd.DataFrame({})
                if self.task != constants.TASK.CHAT_COMPLETION:
                    predictions_batch.index = X_test.index

                if pred_probas_batch is not None:
                    pred_probas_batch.index = X_test.index
                else:
                    pred_probas_batch = pd.DataFrame({})

                performance_batch = pd.DataFrame({})
                performance_batch[constants.PerformanceColumns.BATCH_SIZE_COLUMN_NAME] = \
                    [len(predictions_batch) for _ in range(len(predictions_batch))]

                start_time_iso_string = datetime.fromtimestamp(start_ms / 1000, timezone.utc).isoformat()
                end_time_iso_string = datetime.fromtimestamp(end_ms / 1000, timezone.utc).isoformat()
                performance_batch[constants.PerformanceColumns.START_TIME_COLUMN_NAME] = \
                    [start_time_iso_string for _ in range(len(predictions_batch))]
                performance_batch[constants.PerformanceColumns.END_TIME_COLUMN_NAME] = \
                    [end_time_iso_string for _ in range(len(predictions_batch))]
                performance_batch[constants.PerformanceColumns.LATENCY_COLUMN_NAME] = \
                    [latency_ms for _ in range(len(predictions_batch))]

                if self.task in constants.TEXT_TOKEN_TASKS and enable_character_counts:
                    char_time_start = time.time()
                    if self.task in constants.TEXT_OUTPUT_TOKEN_TASKS:
                        performance_batch[constants.PerformanceColumns.OUTPUT_CHARACTERS_COLUMN_NAME] = \
                            [len(pred) for pred in predictions_batch[predictions_batch.columns.values[0]]]

                    if self.task == constants.TASK.QnA:
                        performance_batch[constants.PerformanceColumns.INPUT_CHARACTERS_COLUMN_NAME] = \
                            [len(q) + len(a) for q, a in zip(X_test[X_test.columns.values[0]],
                                                             X_test[X_test.columns.values[1]])]
                    else:
                        performance_batch[constants.PerformanceColumns.INPUT_CHARACTERS_COLUMN_NAME] = \
                            [len(inp) for inp in X_test[X_test.columns.values[0]]]
                    logger.info(f"Character count time ms: {(time.time() - char_time_start) * 1000}")
                elif self.task not in constants.TEXT_TOKEN_TASKS and enable_character_counts:
                    logger.warning("Character counts not supported for this task type; "
                                   "no character counts will be returned.")

                if tokenizer is not None:
                    token_time_start = time.time()
                    if self.task in constants.TEXT_OUTPUT_TOKEN_TASKS:
                        curr_predictions = list(predictions_batch[predictions_batch.columns.values[0]])
                        tokenized_predictions = tokenizer(curr_predictions)["input_ids"]
                        pred_num_tokens = [len(tokenized_input) for tokenized_input in tokenized_predictions]
                        performance_batch[constants.PerformanceColumns.OUTPUT_TOKENS_COLUMN_NAME] = pred_num_tokens

                    if self.task == constants.TASK.QnA:
                        tokenized_inputs = tokenizer(
                            list(X_test[X_test.columns.values[0]]), list(X_test[X_test.columns.values[1]])
                        )["input_ids"]
                    elif self.task == constants.TASK.CHAT_COMPLETION:
                        input_data = X_test[X_test.columns.values[0]]
                        tokenized_inputs = [
                            list(chain(*tokenizer([chat['content'] for chat in conversation])["input_ids"]))
                            for conversation in input_data
                        ]
                    else:
                        tokenized_inputs = tokenizer(list(X_test[X_test.columns.values[0]]))["input_ids"]
                    input_num_tokens = [len(tokenized_input) for tokenized_input in tokenized_inputs]
                    performance_batch[constants.PerformanceColumns.INPUT_TOKENS_COLUMN_NAME] = input_num_tokens
                    logger.info(f"Token count time ms: {(time.time() - token_time_start) * 1000}")

                predictions = pd.concat([predictions, predictions_batch], axis=0)
                pred_probas = pd.concat([pred_probas, pred_probas_batch], axis=0)
                y_test = pd.concat([y_test, y_test_batch], axis=0)
                performance = pd.concat([performance, performance_batch], axis=0)
                if self.task in [constants.TASK.IMAGE_OBJECT_DETECTION, constants.TASK.IMAGE_INSTANCE_SEGMENTATION]:
                    y_test["image_meta_info"] = X_test["image_meta_info"]

                logger.info(f"Latency (ms) for this batch: {latency_ms}")
        except ValueError as e:
            exception = get_azureml_exception(PredictException, ModelPredictionUserError, e, error=repr(e))
            log_traceback(exception, logger)
            raise exception
        except Exception as e:
            exception = get_azureml_exception(PredictException, ModelPredictionInternalError, e,
                                              wrap_azureml_ex=False, error=repr(e))
            if isinstance(e, (IndexError, RuntimeError)):
                log_traceback(exception, logger, constants.ErrorStrings.TorchErrorMessage)
            else:
                log_traceback(exception, logger)
            raise exception

        if self.batch_size is not None and len(predictions) == 0:
            exception = get_azureml_exception(DataValidationException, EmptyInputData, None)
            log_traceback(exception, logger)
            raise exception
        return predictions, pred_probas, y_test, performance