private static PubsubMessage apply()

in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/ParseUri.java [70:126]


  private static PubsubMessage apply(PubsubMessage message) {
    message = PubsubConstraints.ensureNonNull(message);
    // Copy attributes
    final Map<String, String> attributes = new HashMap<>(message.getAttributeMap());
    byte[] payload = message.getPayload();

    // parse uri based on prefix
    final String uri = attributes.get(Attribute.URI);

    if (uri == null) {
      // We should only have a missing uri attribute if we're replaying messages from decoded
      // payloads in which case they already have parsed URI attributes encoded in the payload
      // and these will be recovered in ParsePayload.
      return message;
    } else if (uri.startsWith(TELEMETRY_URI_PREFIX)) {
      // We don't yet have access to the version field, so we delay populating the
      // document_version
      // attribute until the ParsePayload step where we have map-like access to the JSON content.
      try {
        attributes.putAll(zip(TELEMETRY_URI_SUFFIX_ELEMENTS,
            uri.substring(TELEMETRY_URI_PREFIX.length()).split("/")));
        // only add namespace if previous line didn't throw an exception
        attributes.put(Attribute.DOCUMENT_NAMESPACE, TELEMETRY);
      } catch (UnexpectedPathElementsException ignore) {
        // defer exception to ParseUri::applyFailures
      }
    } else if (uri.startsWith(GENERIC_URI_PREFIX)) {
      try {
        attributes.putAll(zip(GENERIC_URI_SUFFIX_ELEMENTS,
            uri.substring(GENERIC_URI_PREFIX.length()).split("/")));
      } catch (UnexpectedPathElementsException ignore) {
        // defer exception to ParseUri::applyFailures
      }
    } else if (uri.startsWith(StubUri.PREFIX)) {
      attributes.put(Attribute.DOCUMENT_NAMESPACE, "firefox-installer");
      attributes.put(Attribute.DOCUMENT_TYPE, "install");
      attributes.put(Attribute.DOCUMENT_VERSION, "1");

      if (attributes.get(Attribute.MESSAGE_ID) == null) {
        attributes.put(Attribute.DOCUMENT_ID, UUID.randomUUID().toString().toLowerCase());
      } else {
        // convert PubSub message ID to document ID which will be a V3 UUID using a null namespace
        // See https://stackoverflow.com/a/55296637
        UUID documentId = UUID.nameUUIDFromBytes(
            attributes.get(Attribute.MESSAGE_ID).getBytes(StandardCharsets.UTF_8));
        attributes.put(Attribute.DOCUMENT_ID, documentId.toString().toLowerCase());
      }
    } else {
      return message;
    }

    // message ID can be removed since it's only used for generating document ID but not used any
    // further
    attributes.remove(Attribute.MESSAGE_ID);

    return new PubsubMessage(payload, attributes);
  }