public PCollection expand()

in ingestion-beam/src/main/java/com/mozilla/telemetry/contextualservices/EmitCounters.java [26:55]


  public PCollection<SponsoredInteraction> expand(PCollection<SponsoredInteraction> messages) {
    return messages.apply(MapElements.into(TypeDescriptor.of(SponsoredInteraction.class))
        .via((SponsoredInteraction interaction) -> {
          // mock attributes based on the sponsored interaction fields
          Map<String, String> attributes = new HashMap<String, String>();
          String originalDocType = interaction.getOriginalDocType();
          if (originalDocType != null) {
            attributes.put(Attribute.DOCUMENT_TYPE, originalDocType);
          } else {
            attributes.put(Attribute.DOCUMENT_TYPE, interaction.getDerivedDocumentType());
          }

          if (interaction.getOriginalNamespace() != null) {
            attributes.put(Attribute.DOCUMENT_NAMESPACE, interaction.getOriginalNamespace());
          }

          String interactionType = interaction.getInteractionType();
          if (SponsoredInteraction.INTERACTION_IMPRESSION.equals(interactionType)) {
            PerDocTypeCounter.inc(attributes, "valid_impression_url");
          } else if (SponsoredInteraction.INTERACTION_CLICK.equals(interactionType)) {
            PerDocTypeCounter.inc(attributes, "valid_click_url");
          }

          PerDocTypeCounter.inc(attributes, "valid_submission");
          if (interaction.getRequestId() != null) {
            PerDocTypeCounter.inc(attributes, "valid_submission_merino");
          }
          return interaction;
        }));
  }