in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/ParseUri.java [128:167]
private static PubsubMessage applyFailures(PubsubMessage message) {
message = PubsubConstraints.ensureNonNull(message);
final String uri = message.getAttribute(Attribute.URI);
// Throws an exception on certain URI patterns to signal that the message should be rejected.
MessageScrubber.scrubByUri(uri);
if (uri == null) {
// We should only have a missing uri attribute if we're replaying messages from decoded
// payloads in which case they already have parsed URI attributes encoded in the payload
// and these will be recovered in ParsePayload.
return message;
// nothing to add here
} else if (uri.startsWith(StubUri.PREFIX)) {
try {
// create new message with updated payload
message = new PubsubMessage(StubUri.parse(uri), message.getAttributeMap());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} else if (uri.startsWith(TELEMETRY_URI_PREFIX)) {
// induce failures previously ignored in ParseUri::apply
zip(TELEMETRY_URI_SUFFIX_ELEMENTS, uri.substring(TELEMETRY_URI_PREFIX.length()).split("/"));
} else if (uri.startsWith(GENERIC_URI_PREFIX)) {
// induce failures previously ignored in ParseUri::apply
zip(GENERIC_URI_SUFFIX_ELEMENTS, uri.substring(GENERIC_URI_PREFIX.length()).split("/"));
} else {
throw new InvalidUriException("Unknown URI prefix");
}
// Validate attributes
try {
UUID.fromString(message.getAttribute(Attribute.DOCUMENT_ID));
} catch (IllegalArgumentException e) {
throw new InvalidUriException("Entry in document_id place is not a valid UUID");
}
return message;
}