public void processElement()

in ingestion-beam/src/main/java/com/mozilla/telemetry/contextualservices/FilterByDocType.java [63:121]


    public void processElement(@Element PubsubMessage message, OutputReceiver<PubsubMessage> out) {
      message = PubsubConstraints.ensureNonNull(message);
      if (allowedNamespacesSet == null || allowedDocTypesSet == null) {
        allowedDocTypesSet = parseAllowlistString(allowedDocTypes, "allowedDocTypes");
        allowedNamespacesSet = parseAllowlistString(allowedNamespaces, "allowedNamespaces");
      }

      final String namespace = message.getAttribute(Attribute.DOCUMENT_NAMESPACE);
      final String doctype = message.getAttribute(Attribute.DOCUMENT_TYPE);

      if (!allowedNamespacesSet.contains(namespace) || !allowedDocTypesSet.contains(doctype)) {
        PerDocTypeCounter.inc(message.getAttributeMap(), "doctype_filter_rejected");
        return;
      }
      PerDocTypeCounter.inc(message.getAttributeMap(), "doctype_filter_passed");

      // Special handling for desktop pings.
      if ("contextual-services".equals(namespace) || "firefox-desktop".equals(namespace)) {
        // Verify Firefox version here so rejected messages don't go to error output
        final int minVersion;
        int maxVersion = Integer.MAX_VALUE;
        final boolean isLegacyDesktop;
        if (doctype.startsWith("topsites-")) {
          minVersion = 87;
          isLegacyDesktop = true;
        } else if (doctype.startsWith("quicksuggest-")) {
          minVersion = 89;
          isLegacyDesktop = true;
        } else if ("top-sites".equals(doctype)) {
          minVersion = 116;
          maxVersion = 136;
          isLegacyDesktop = false;
        } else if ("quick-suggest".equals(doctype)) {
          minVersion = 116;
          isLegacyDesktop = false;
        } else if ("search-with".equals(doctype)) {
          minVersion = 122;
          isLegacyDesktop = false;
        } else {
          PerDocTypeCounter.inc(message.getAttributeMap(), "doctype_filter_unhandled");
          return; // drop message
        }
        final String version = message.getAttribute(Attribute.USER_AGENT_VERSION);
        try {
          if (version == null || minVersion > Integer.parseInt(version)
              || maxVersion < Integer.parseInt(version) || (limitLegacyDesktopVersion
                  && isLegacyDesktop && 116 <= Integer.parseInt(version))) {
            PerDocTypeCounter.inc(message.getAttributeMap(), "version_filter_rejected");
            return; // drop message
          }
        } catch (NumberFormatException e) {
          PerDocTypeCounter.inc(message.getAttributeMap(), "version_filter_invalid");
          return; // drop message
        }
      }
      PerDocTypeCounter.inc(message.getAttributeMap(), "version_filter_passed");

      out.output(message);
    }