in ingestion-beam/src/main/java/com/mozilla/telemetry/contextualservices/EmitCounters.java [26:55]
public PCollection<SponsoredInteraction> expand(PCollection<SponsoredInteraction> messages) {
return messages.apply(MapElements.into(TypeDescriptor.of(SponsoredInteraction.class))
.via((SponsoredInteraction interaction) -> {
// mock attributes based on the sponsored interaction fields
Map<String, String> attributes = new HashMap<String, String>();
String originalDocType = interaction.getOriginalDocType();
if (originalDocType != null) {
attributes.put(Attribute.DOCUMENT_TYPE, originalDocType);
} else {
attributes.put(Attribute.DOCUMENT_TYPE, interaction.getDerivedDocumentType());
}
if (interaction.getOriginalNamespace() != null) {
attributes.put(Attribute.DOCUMENT_NAMESPACE, interaction.getOriginalNamespace());
}
String interactionType = interaction.getInteractionType();
if (SponsoredInteraction.INTERACTION_IMPRESSION.equals(interactionType)) {
PerDocTypeCounter.inc(attributes, "valid_impression_url");
} else if (SponsoredInteraction.INTERACTION_CLICK.equals(interactionType)) {
PerDocTypeCounter.inc(attributes, "valid_click_url");
}
PerDocTypeCounter.inc(attributes, "valid_submission");
if (interaction.getRequestId() != null) {
PerDocTypeCounter.inc(attributes, "valid_submission_merino");
}
return interaction;
}));
}