public void processElement()

in src/main/java/com/google/solutions/df/log/aggregations/common/PredictTransform.java [77:110]


    public void processElement(ProcessContext c) {
      Row aggrData = c.element();
      double[] inputVector = Util.getAggrVector(aggrData);

      CentroidVector centroidVector =
          c.sideInput(centroidFeature).stream()
              .filter(
                  centroid ->
                      centroid.centroidId().intValue()
                          == aggrData.getInt32("centroid_id").intValue())
              .findFirst()
              .orElse(null);

      if (centroidVector != null) {
        double normalizedDistance =
            Util.calculateStdDeviation(
                inputVector,
                centroidVector.featureVectors().stream().mapToDouble(d -> d).toArray());
        boolean outlierFound =
            (normalizedDistance / centroidVector.normalizedDistance()) > THRESH_HOLD_PARAM
                ? true
                : false;
        if (outlierFound) {
          c.output(aggrData);
          LOG.info(
              "*****Outlier Found*****- Centroid ID {}, Normalized Std Dev{}, InputFeature Std Dev {}",
              centroidVector.centroidId(),
              centroidVector.normalizedDistance(),
              normalizedDistance);
        }
      } else {
        LOG.info("Centroid Vector is null for centr {}", aggrData.getInt32("centroid_id"));
      }
    }