in horovod/spark/keras/estimator.py [0:0]
def _transform(self, df):
keras_utils = self._get_keras_utils()
floatx = self._get_floatx()
serialized_model = keras_utils.serialize_model(self.getModel())
label_cols = self.getLabelColumns()
output_cols = self.getOutputCols()
feature_cols = self.getFeatureColumns()
custom_objects = self.getCustomObjects()
metadata = self._get_metadata()
pin_cpu = remote._pin_cpu_fn()
def predict(rows):
import tensorflow as tf
from pyspark import Row
from pyspark.ml.linalg import DenseVector, SparseVector
k = keras_utils.keras()
k.backend.set_floatx(floatx)
# Do not use GPUs for prediction, use single CPU core per task.
pin_cpu(tf, k)
def load_model_fn(x):
with k.utils.custom_object_scope(custom_objects):
return k.models.load_model(x)
model = keras_utils.deserialize_model(serialized_model,
load_model_fn=load_model_fn)
input_shapes = [[dim if dim else -1 for dim in input.shape.as_list()]
for input in model.inputs]
def to_array(item):
if type(item) in [DenseVector or SparseVector]:
return item.toArray()
else:
return np.array(item)
def to_numpy(item):
# Some versions of TensorFlow will return an EagerTensor
return item.numpy() if hasattr(item, 'numpy') else item
# Perform predictions.
for row in rows:
fields = row.asDict().copy()
preds = model.predict_on_batch(
[to_array(row[feature_cols[i]]).reshape(input_shapes[i])
for i in range(len(feature_cols))])
preds = [to_numpy(item) for item in preds]
for label_col, output_col, pred, in zip(label_cols, output_cols, preds):
meta = metadata[label_col]
col_type = meta['spark_data_type']
# dtype for DenseVector and SparseVector is always np.float64
if col_type == DenseVector:
shape = np.prod(pred.shape)
flattened_pred = pred.reshape(shape, )
field = DenseVector(flattened_pred)
elif col_type == SparseVector:
shape = meta['shape']
flattened_pred = pred.reshape(shape, )
nonzero_indices = flattened_pred.nonzero()[0]
field = SparseVector(shape, nonzero_indices,
flattened_pred[nonzero_indices])
else:
# If the column is scalar type, int, float, etc.
value = pred[0]
python_type = util.spark_scalar_to_python_type(col_type)
if issubclass(python_type, numbers.Integral):
value = round(value)
field = python_type(value)
fields[output_col] = field
yield Row(**fields)
return df.rdd.mapPartitions(predict).toDF()