in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/ParseUri.java [70:126]
private static PubsubMessage apply(PubsubMessage message) {
message = PubsubConstraints.ensureNonNull(message);
// Copy attributes
final Map<String, String> attributes = new HashMap<>(message.getAttributeMap());
byte[] payload = message.getPayload();
// parse uri based on prefix
final String uri = attributes.get(Attribute.URI);
if (uri == null) {
// We should only have a missing uri attribute if we're replaying messages from decoded
// payloads in which case they already have parsed URI attributes encoded in the payload
// and these will be recovered in ParsePayload.
return message;
} else if (uri.startsWith(TELEMETRY_URI_PREFIX)) {
// We don't yet have access to the version field, so we delay populating the
// document_version
// attribute until the ParsePayload step where we have map-like access to the JSON content.
try {
attributes.putAll(zip(TELEMETRY_URI_SUFFIX_ELEMENTS,
uri.substring(TELEMETRY_URI_PREFIX.length()).split("/")));
// only add namespace if previous line didn't throw an exception
attributes.put(Attribute.DOCUMENT_NAMESPACE, TELEMETRY);
} catch (UnexpectedPathElementsException ignore) {
// defer exception to ParseUri::applyFailures
}
} else if (uri.startsWith(GENERIC_URI_PREFIX)) {
try {
attributes.putAll(zip(GENERIC_URI_SUFFIX_ELEMENTS,
uri.substring(GENERIC_URI_PREFIX.length()).split("/")));
} catch (UnexpectedPathElementsException ignore) {
// defer exception to ParseUri::applyFailures
}
} else if (uri.startsWith(StubUri.PREFIX)) {
attributes.put(Attribute.DOCUMENT_NAMESPACE, "firefox-installer");
attributes.put(Attribute.DOCUMENT_TYPE, "install");
attributes.put(Attribute.DOCUMENT_VERSION, "1");
if (attributes.get(Attribute.MESSAGE_ID) == null) {
attributes.put(Attribute.DOCUMENT_ID, UUID.randomUUID().toString().toLowerCase());
} else {
// convert PubSub message ID to document ID which will be a V3 UUID using a null namespace
// See https://stackoverflow.com/a/55296637
UUID documentId = UUID.nameUUIDFromBytes(
attributes.get(Attribute.MESSAGE_ID).getBytes(StandardCharsets.UTF_8));
attributes.put(Attribute.DOCUMENT_ID, documentId.toString().toLowerCase());
}
} else {
return message;
}
// message ID can be removed since it's only used for generating document ID but not used any
// further
attributes.remove(Attribute.MESSAGE_ID);
return new PubsubMessage(payload, attributes);
}