in horovod/spark/torch/estimator.py [0:0]
def _transform(self, df):
model_pre_predict = self.getModel()
model_pre_predict.eval()
deserialize = deserialize_fn()
serialize = serialize_fn()
serialized_model = serialize(model_pre_predict)
input_shapes = self.getInputShapes()
label_cols = self.getLabelColumns()
output_cols = self.getOutputCols()
feature_cols = self.getFeatureColumns()
metadata = self._get_metadata()
def predict(rows):
from pyspark import Row
from pyspark.ml.linalg import DenseVector, SparseVector
model = deserialize(serialized_model)
# Perform predictions.
for row in rows:
fields = row.asDict().copy()
# Note: if the col is SparseVector, torch.tensor(col) correctly converts it to a
# dense torch tensor.
data = [torch.tensor([row[col]]).reshape(shape) for
col, shape in zip(feature_cols, input_shapes)]
with torch.no_grad():
preds = model(*data)
if not isinstance(preds, list) and not isinstance(preds, tuple):
preds = [preds]
for label_col, output_col, pred in zip(label_cols, output_cols, preds):
meta = metadata[label_col]
col_type = meta['spark_data_type']
# dtype for dense and spark tensor is always np.float64
if col_type == DenseVector:
shape = np.prod(pred.shape)
flattened_pred = pred.reshape(shape, )
field = DenseVector(flattened_pred)
elif col_type == SparseVector:
shape = meta['shape']
flattened_pred = pred.reshape(shape, )
nonzero_indices = flattened_pred.nonzero()[0]
field = SparseVector(shape, nonzero_indices,
flattened_pred[nonzero_indices])
elif pred.shape.numel() == 1:
# If the column is scalar type, int, float, etc.
value = pred.item()
python_type = util.spark_scalar_to_python_type(col_type)
if issubclass(python_type, numbers.Integral):
value = round(value)
field = python_type(value)
else:
field = DenseVector(pred.reshape(-1))
fields[output_col] = field
yield Row(**fields)
return df.rdd.mapPartitions(predict).toDF()