in src/main/java/com/google/solutions/df/log/aggregations/common/PredictTransform.java [121:165]
public void processElement(ProcessContext c) {
HashMap<Integer, Double> centroidMap = new HashMap<Integer, Double>();
double[] aggrFeatureVector = Util.getAggrVector(c.element());
c.sideInput(centroidFeature)
.forEach(
feature -> {
Double distanceFromCentroid =
new EuclideanDistance()
.compute(
aggrFeatureVector,
feature.featureVectors().stream().mapToDouble(d -> d).toArray());
centroidMap.put(feature.centroidId(), distanceFromCentroid);
LOG.debug(
"Centroid_id {} , distance {}", feature.centroidId(), distanceFromCentroid);
});
// centroid_id,
Entry<Integer, Double> closestDistance =
Collections.min(centroidMap.entrySet(), Comparator.comparing(Entry::getValue));
LOG.debug(
"****closet distance {}, centroid {}",
closestDistance.getKey(),
closestDistance.getValue());
c.output(
Row.withSchema(Util.outlierSchema)
.addValues(
c.element().getString("subscriber_id"),
c.element().getString("dst_subnet"),
c.element().getString("transaction_time"),
c.element().getInt32("number_of_unique_ips"),
c.element().getInt32("number_of_unique_ports"),
c.element().getInt32("number_of_records"),
c.element().getInt32("max_tx_bytes"),
c.element().getInt32("min_tx_bytes"),
c.element().getDouble("avg_tx_bytes"),
c.element().getInt32("max_rx_bytes"),
c.element().getInt32("min_rx_bytes"),
c.element().getDouble("avg_rx_bytes"),
c.element().getInt32("max_duration"),
c.element().getInt32("min_duration"),
c.element().getDouble("avg_duration"),
closestDistance.getKey())
.build());
}