public void processElement()

in beam-collector/src/main/java/com/google/collector/clustering/CreateConstellation.java [36:74]


  public void processElement(ProcessContext ctx) {
    KV<String, Iterable<SecurityReport>> element = ctx.element();
    if (element == null) {
      return;
    }

    Iterable<SecurityReport> reports = element.getValue();
    if (reports == null || !reports.iterator().hasNext()) {
      return;
    }

    SecurityReport sample = reports.iterator().next();
    long totalCount = 0;
    ImmutableSet.Builder<String> checksums = ImmutableSet.builder();
    for (SecurityReport securityReport : reports) {
      // always pick the latest report
      if (securityReport.getReportTime() > sample.getReportTime()) {
        sample = securityReport;
      }

      totalCount += securityReport.getReportCount();
      checksums.add(securityReport.getReportChecksum());
    }

    ConstellationMetadata metadata = ConstellationMetadata.newBuilder()
        .setReportCount(totalCount)
        .setLatestSeenTimestamp(sample.getReportTime())
        .setName(element.getKey())
        .setSample(sample)
        .addAllReports(checksums.build())
        .build();

    ctx.output(Constellation.newBuilder()
        .setId(element.getKey())
        .setFeature(determineFeature(sample))
        .setRefactoring(determineRefactoring(sample))
        .setMetadata(metadata)
        .build());
  }