in src/main/java/com/google/solutions/df/log/aggregations/common/PredictTransform.java [77:110]
public void processElement(ProcessContext c) {
Row aggrData = c.element();
double[] inputVector = Util.getAggrVector(aggrData);
CentroidVector centroidVector =
c.sideInput(centroidFeature).stream()
.filter(
centroid ->
centroid.centroidId().intValue()
== aggrData.getInt32("centroid_id").intValue())
.findFirst()
.orElse(null);
if (centroidVector != null) {
double normalizedDistance =
Util.calculateStdDeviation(
inputVector,
centroidVector.featureVectors().stream().mapToDouble(d -> d).toArray());
boolean outlierFound =
(normalizedDistance / centroidVector.normalizedDistance()) > THRESH_HOLD_PARAM
? true
: false;
if (outlierFound) {
c.output(aggrData);
LOG.info(
"*****Outlier Found*****- Centroid ID {}, Normalized Std Dev{}, InputFeature Std Dev {}",
centroidVector.centroidId(),
centroidVector.normalizedDistance(),
normalizedDistance);
}
} else {
LOG.info("Centroid Vector is null for centr {}", aggrData.getInt32("centroid_id"));
}
}