inference/transform_predictor/predictor.py (113 lines of code) (raw):

# Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """This file is to serve BQML model trained with TRANSFORM.""" import collections import os from typing import Any, Optional from bigquery_ml_utils.inference.xgboost_predictor import predictor as bqml_xgboost_predictor import tensorflow as tf from bigquery_ml_utils.tensorflow_ops.load_module import load_module gen_date_ops = load_module("_date_ops.so") gen_datetime_ops = load_module("_datetime_ops.so") gen_time_ops = load_module("_time_ops.so") gen_timestamp_ops = load_module("_timestamp_ops.so") class Predictor: """Class to feed input data into BQML model trained with TRANSFORM. It performs both preprocessing and postprocessing on the input and output. """ def __init__( self, transform_savedmodel, model_tensorflow=None, model_xgboost=None ): """Initializes a Predictor to serve BQML models trained with TRANSFORM. Args: transform_savedmodel: SavedModel pb of the TRANSFORM. model_tensorflow: BQML model in tensorflow savedmodel format. model_xgboost: BQML model in booster format. Returns: A 'Predictor' instance. """ self._transform_savedmodel = transform_savedmodel self._model_tensorflow = model_tensorflow self._model_xgboost = model_xgboost # Number of input in the Predict call. self._num_input = 0 def _get_transform_result(self, raw_input): """Gets the TRANSFORM result from the raw input data. Args: raw_input: Raw input. Returns: TRANSFORM result. """ input_dict = collections.defaultdict(list) for row in raw_input: for key, value in row.items(): input_dict[key].append(value) infer = self._transform_savedmodel.signatures['serving_default'] input_signature = infer.structured_input_signature[1] transform_input = dict() for key, value in input_dict.items(): if key not in input_signature: raise ValueError(f'"{key}" in not an input of the TRANSFORM.') transform_input[key] = tf.constant(value, input_signature[key].dtype) return infer(**transform_input) def _convert_transform_result(self, transform_result): """Converts the TRANSFORM result to a list. Args: transform_result: TRANSFORM result from the raw input. Returns: TRANSFORM results in a list. Each element in the list is the TRANSFORM result of each raw input instance. """ if len(transform_result) == 1: for value in transform_result.values(): return value.numpy().tolist() # Initialize the output as a list of dict. output = [{} for _ in range(self._num_input)] # Convert the transform_result in batch representation to a list, in which # each element is the transform result of each raw input instance. for key, value in transform_result.items(): if value.dtype == tf.string: batch_result = value.numpy().astype(str).tolist() else: batch_result = value.numpy().tolist() for i, value in enumerate(batch_result): if isinstance(value, list): output[i][key] = value.copy() else: output[i][key] = value return output def _get_tf_model_result(self, transform_result): """Gets the model result from the TRANSFORM result for tensorflow models. Args: transform_result: TRANSFORM result from the raw input. Returns: Model prediction results in a list. Each element in the list is the prediction result of each raw input instance. """ model_input = dict() for key, value in transform_result.items(): model_input[key] = value # BQML trained model uses float64 as the input dtype for all numerical # features. if value.dtype == tf.int64: model_input[key] = tf.cast(value, tf.float64) infer = None if 'predict' in self._model_tensorflow.signatures: # By default, BQML DNN model is exported with 'predict' signature. infer = self._model_tensorflow.signatures['predict'] else: infer = self._model_tensorflow.signatures['serving_default'] inference_result = infer(**model_input) # If the inference_result of the model contains only one named tensor, we # omit the name. This aligns with the TF serving response. if len(inference_result) == 1: for value in inference_result.values(): # TODO(b/253233131): Support array<struct> as the output. return value.numpy().tolist() # Initialize the output as a list of dict. output = [{} for i in range(self._num_input)] # Convert the inference_result in batch representation to a list, in which # each element is the prediction result of each raw input instance. for key, value in inference_result.items(): # TODO(b/253233131): Support array<struct> as the output. batch_result = list() # The numpy() of string tensor has type BYTE, which needs to be converted # to string. if value.dtype == tf.string: batch_result = value.numpy().astype(str).tolist() else: batch_result = value.numpy().tolist() for i, value in enumerate(batch_result): if isinstance(value, list): output[i][key] = value.copy() else: output[i][key] = value return output def _get_xgboost_model_result(self, transform_result): """Gets the model result from the TRANSFORM result for xgboost models. Args: transform_result: TRANSFORM result from the raw input. Returns: Model prediction results in a list. Each element in the list is the prediction result of each raw input instance. """ xgb_model_input = [{} for i in range(self._num_input)] # Convert the transform_result in batch representation to a list, in which # each element is the prediction result of each raw input instance. for key, value in transform_result.items(): # TODO(b/253233131): Support array<struct> as the output. batch_result = list() # The numpy() of string tensor has type BYTE, which needs to be converted # to string. if value.dtype == tf.string: batch_result = value.numpy().astype(str).tolist() else: batch_result = value.numpy().tolist() for i, value in enumerate(batch_result): if isinstance(value, list): xgb_model_input[i][key] = value.copy() else: xgb_model_input[i][key] = value return self._model_xgboost.predict(xgb_model_input) def _get_model_result(self, transform_result): """Gets the model result from the TRANSFORM result. Args: transform_result: TRANSFORM result from the raw input. Returns: Model prediction results in a list. Each element in the list is the prediction result of each raw input instance. """ if self._model_tensorflow is not None: return self._get_tf_model_result(transform_result) if self._model_xgboost is not None: return self._get_xgboost_model_result(transform_result) return self._convert_transform_result(transform_result) def predict(self, raw_input, **kwargs): """Performs prediction. Args: raw_input: A list of prediction input instances. **kwargs: A dictionary of keyword args provided as additional fields on the predict request body. Returns: A dict containing the prediction results. """ del kwargs self._num_input = len(raw_input) return self._get_model_result(self._get_transform_result(raw_input)) @classmethod def from_path(cls, model_dir): """Creates an instance of Predictor using the given path. Args: model_dir: The local directory that contains the BQML model trained with TRANSFORM. Returns: An instance of 'Predictor'. """ if not tf.io.gfile.exists(os.path.join(model_dir, 'transform')): raise ValueError('TRANSFORM subdirectory is not found in the given path.') transform_savedmodel = tf.saved_model.load( os.path.join(model_dir, 'transform') ) if tf.io.gfile.exists(os.path.join(model_dir, 'saved_model.pb')): model_tensorflow = tf.saved_model.load(model_dir) return cls(transform_savedmodel, model_tensorflow=model_tensorflow) if tf.io.gfile.exists(os.path.join(model_dir, 'model.bst')): model_xgboost = bqml_xgboost_predictor.Predictor.from_path(model_dir) return cls(transform_savedmodel, model_xgboost=model_xgboost) return cls(transform_savedmodel)