public void processElement()

in src/main/java/com/google/solutions/df/log/aggregations/common/PredictTransform.java [121:165]


    public void processElement(ProcessContext c) {
      HashMap<Integer, Double> centroidMap = new HashMap<Integer, Double>();
      double[] aggrFeatureVector = Util.getAggrVector(c.element());
      c.sideInput(centroidFeature)
          .forEach(
              feature -> {
                Double distanceFromCentroid =
                    new EuclideanDistance()
                        .compute(
                            aggrFeatureVector,
                            feature.featureVectors().stream().mapToDouble(d -> d).toArray());
                centroidMap.put(feature.centroidId(), distanceFromCentroid);

                LOG.debug(
                    "Centroid_id {} , distance {}", feature.centroidId(), distanceFromCentroid);
              });
      // centroid_id,
      Entry<Integer, Double> closestDistance =
          Collections.min(centroidMap.entrySet(), Comparator.comparing(Entry::getValue));

      LOG.debug(
          "****closet distance {}, centroid {}",
          closestDistance.getKey(),
          closestDistance.getValue());
      c.output(
          Row.withSchema(Util.outlierSchema)
              .addValues(
                  c.element().getString("subscriber_id"),
                  c.element().getString("dst_subnet"),
                  c.element().getString("transaction_time"),
                  c.element().getInt32("number_of_unique_ips"),
                  c.element().getInt32("number_of_unique_ports"),
                  c.element().getInt32("number_of_records"),
                  c.element().getInt32("max_tx_bytes"),
                  c.element().getInt32("min_tx_bytes"),
                  c.element().getDouble("avg_tx_bytes"),
                  c.element().getInt32("max_rx_bytes"),
                  c.element().getInt32("min_rx_bytes"),
                  c.element().getDouble("avg_rx_bytes"),
                  c.element().getInt32("max_duration"),
                  c.element().getInt32("min_duration"),
                  c.element().getDouble("avg_duration"),
                  closestDistance.getKey())
              .build());
    }