in ingestion-beam/src/main/java/com/mozilla/telemetry/contextualservices/FilterByDocType.java [63:121]
public void processElement(@Element PubsubMessage message, OutputReceiver<PubsubMessage> out) {
message = PubsubConstraints.ensureNonNull(message);
if (allowedNamespacesSet == null || allowedDocTypesSet == null) {
allowedDocTypesSet = parseAllowlistString(allowedDocTypes, "allowedDocTypes");
allowedNamespacesSet = parseAllowlistString(allowedNamespaces, "allowedNamespaces");
}
final String namespace = message.getAttribute(Attribute.DOCUMENT_NAMESPACE);
final String doctype = message.getAttribute(Attribute.DOCUMENT_TYPE);
if (!allowedNamespacesSet.contains(namespace) || !allowedDocTypesSet.contains(doctype)) {
PerDocTypeCounter.inc(message.getAttributeMap(), "doctype_filter_rejected");
return;
}
PerDocTypeCounter.inc(message.getAttributeMap(), "doctype_filter_passed");
// Special handling for desktop pings.
if ("contextual-services".equals(namespace) || "firefox-desktop".equals(namespace)) {
// Verify Firefox version here so rejected messages don't go to error output
final int minVersion;
int maxVersion = Integer.MAX_VALUE;
final boolean isLegacyDesktop;
if (doctype.startsWith("topsites-")) {
minVersion = 87;
isLegacyDesktop = true;
} else if (doctype.startsWith("quicksuggest-")) {
minVersion = 89;
isLegacyDesktop = true;
} else if ("top-sites".equals(doctype)) {
minVersion = 116;
maxVersion = 136;
isLegacyDesktop = false;
} else if ("quick-suggest".equals(doctype)) {
minVersion = 116;
isLegacyDesktop = false;
} else if ("search-with".equals(doctype)) {
minVersion = 122;
isLegacyDesktop = false;
} else {
PerDocTypeCounter.inc(message.getAttributeMap(), "doctype_filter_unhandled");
return; // drop message
}
final String version = message.getAttribute(Attribute.USER_AGENT_VERSION);
try {
if (version == null || minVersion > Integer.parseInt(version)
|| maxVersion < Integer.parseInt(version) || (limitLegacyDesktopVersion
&& isLegacyDesktop && 116 <= Integer.parseInt(version))) {
PerDocTypeCounter.inc(message.getAttributeMap(), "version_filter_rejected");
return; // drop message
}
} catch (NumberFormatException e) {
PerDocTypeCounter.inc(message.getAttributeMap(), "version_filter_invalid");
return; // drop message
}
}
PerDocTypeCounter.inc(message.getAttributeMap(), "version_filter_passed");
out.output(message);
}