in flink-ml-lib/src/main/java/org/apache/flink/ml/feature/standardscaler/StandardScalerModel.java [112:139]
public Row map(Row dataPoint) {
if (mean == null) {
StandardScalerModelData modelData =
(StandardScalerModelData)
getRuntimeContext().getBroadcastVariable(broadcastModelKey).get(0);
mean = modelData.mean;
DenseVector std = modelData.std;
if (withStd) {
scale = std;
double[] scaleValues = scale.values;
for (int i = 0; i < scaleValues.length; i++) {
scaleValues[i] = scaleValues[i] == 0 ? 0 : 1 / scaleValues[i];
}
}
}
Vector outputVec = ((Vector) (dataPoint.getField(inputCol))).clone();
if (withMean) {
outputVec = outputVec.toDense();
BLAS.axpy(-1, mean, (DenseVector) outputVec);
}
if (withStd) {
BLAS.hDot(scale, outputVec);
}
return RowUtils.append(dataPoint, outputVec);
}