ingestion-beam/src/main/java/com/mozilla/telemetry/amplitude/FilterByDocType.java [42:73]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private Set<String> parseAllowlistString(String allowlistString, String argument) {
    if (allowlistString == null) {
      throw new IllegalArgumentException(
          String.format("Required --%s argument not found", argument));
    }
    return Arrays.stream(allowlistString.split(",")).filter(StringUtils::isNotBlank)
        .collect(Collectors.toSet());
  }

  @Override
  public PCollection<PubsubMessage> expand(PCollection<PubsubMessage> input) {
    return input.apply(ParDo.of(new Fn()));
  }

  private class Fn extends DoFn<PubsubMessage, PubsubMessage> {

    @ProcessElement
    public void processElement(@Element PubsubMessage message, OutputReceiver<PubsubMessage> out) {
      message = PubsubConstraints.ensureNonNull(message);
      if (allowedNamespacesSet == null || allowedDocTypesSet == null) {
        allowedDocTypesSet = parseAllowlistString(allowedDocTypes, "allowedDocTypes");
        allowedNamespacesSet = parseAllowlistString(allowedNamespaces, "allowedNamespaces");
      }

      final String namespace = message.getAttribute(Attribute.DOCUMENT_NAMESPACE);
      final String doctype = message.getAttribute(Attribute.DOCUMENT_TYPE);

      if (!allowedNamespacesSet.contains(namespace) || !allowedDocTypesSet.contains(doctype)) {
        PerDocTypeCounter.inc(message.getAttributeMap(), "doctype_filter_rejected");
        return;
      }
      PerDocTypeCounter.inc(message.getAttributeMap(), "doctype_filter_passed");
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



ingestion-beam/src/main/java/com/mozilla/telemetry/contextualservices/FilterByDocType.java [46:77]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private Set<String> parseAllowlistString(String allowlistString, String argument) {
    if (allowlistString == null) {
      throw new IllegalArgumentException(
          String.format("Required --%s argument not found", argument));
    }
    return Arrays.stream(allowlistString.split(",")).filter(StringUtils::isNotBlank)
        .collect(Collectors.toSet());
  }

  @Override
  public PCollection<PubsubMessage> expand(PCollection<PubsubMessage> input) {
    return input.apply(ParDo.of(new Fn()));
  }

  private class Fn extends DoFn<PubsubMessage, PubsubMessage> {

    @ProcessElement
    public void processElement(@Element PubsubMessage message, OutputReceiver<PubsubMessage> out) {
      message = PubsubConstraints.ensureNonNull(message);
      if (allowedNamespacesSet == null || allowedDocTypesSet == null) {
        allowedDocTypesSet = parseAllowlistString(allowedDocTypes, "allowedDocTypes");
        allowedNamespacesSet = parseAllowlistString(allowedNamespaces, "allowedNamespaces");
      }

      final String namespace = message.getAttribute(Attribute.DOCUMENT_NAMESPACE);
      final String doctype = message.getAttribute(Attribute.DOCUMENT_TYPE);

      if (!allowedNamespacesSet.contains(namespace) || !allowedDocTypesSet.contains(doctype)) {
        PerDocTypeCounter.inc(message.getAttributeMap(), "doctype_filter_rejected");
        return;
      }
      PerDocTypeCounter.inc(message.getAttributeMap(), "doctype_filter_passed");
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



