private static PubsubMessage applyFailures()

in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/ParseUri.java [128:167]


  private static PubsubMessage applyFailures(PubsubMessage message) {
    message = PubsubConstraints.ensureNonNull(message);

    final String uri = message.getAttribute(Attribute.URI);

    // Throws an exception on certain URI patterns to signal that the message should be rejected.
    MessageScrubber.scrubByUri(uri);

    if (uri == null) {
      // We should only have a missing uri attribute if we're replaying messages from decoded
      // payloads in which case they already have parsed URI attributes encoded in the payload
      // and these will be recovered in ParsePayload.
      return message;
      // nothing to add here
    } else if (uri.startsWith(StubUri.PREFIX)) {
      try {
        // create new message with updated payload
        message = new PubsubMessage(StubUri.parse(uri), message.getAttributeMap());
      } catch (IOException e) {
        throw new UncheckedIOException(e);
      }
    } else if (uri.startsWith(TELEMETRY_URI_PREFIX)) {
      // induce failures previously ignored in ParseUri::apply
      zip(TELEMETRY_URI_SUFFIX_ELEMENTS, uri.substring(TELEMETRY_URI_PREFIX.length()).split("/"));
    } else if (uri.startsWith(GENERIC_URI_PREFIX)) {
      // induce failures previously ignored in ParseUri::apply
      zip(GENERIC_URI_SUFFIX_ELEMENTS, uri.substring(GENERIC_URI_PREFIX.length()).split("/"));
    } else {
      throw new InvalidUriException("Unknown URI prefix");
    }

    // Validate attributes
    try {
      UUID.fromString(message.getAttribute(Attribute.DOCUMENT_ID));
    } catch (IllegalArgumentException e) {
      throw new InvalidUriException("Entry in document_id place is not a valid UUID");
    }

    return message;
  }