public PubsubMessage apply()

in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/ExtractIpFromLogEntry.java [41:87]


    public PubsubMessage apply(PubsubMessage message) {
      if (message.getAttribute(Attribute.SUBMISSION_TIMESTAMP) != null) {
        // If this message has submission_timestamp set, it is a normal message from the edge
        // server rather than a LogEntry from Cloud Logging, so we return immediately.
        return message;
      }

      if (message.getAttributeMap().containsKey(Attribute.X_FORWARDED_FOR)) {
        // Return early since IP has been extracted
        countIpAlreadyExtracted.inc();
        return message;
      }

      try {
        ObjectNode logEntry;
        final Map<String, String> attributes = new HashMap<>(message.getAttributeMap());

        logEntry = Json.readObjectNode(message.getPayload());
        JsonNode maybeJsonPayloadNode = logEntry.path("jsonPayload");
        if (maybeJsonPayloadNode.isObject()) {
          ObjectNode jsonPayload = (ObjectNode) maybeJsonPayloadNode;
          JsonNode maybeFieldsNode = jsonPayload.path("Fields");
          if (maybeFieldsNode.isObject()) {
            ObjectNode fields = (ObjectNode) maybeFieldsNode;
            JsonNode ipAddress = fields.remove("ip_address");
            if (ipAddress != null) {
              // if ipAddress is null, it means it wasn't present in the payload
              String clientIpAddress = ipAddress.textValue();
              attributes.put(Attribute.X_FORWARDED_FOR, clientIpAddress);
              countIpExtracted.inc();

              jsonPayload.replace("Fields", fields);
              logEntry.replace("jsonPayload", jsonPayload);
              byte[] sanitizedPayload = logEntry.toString().getBytes(StandardCharsets.UTF_8);

              return new PubsubMessage(sanitizedPayload, attributes);
            }
          }
        }
        return message;

      } catch (IOException e) {
        throw new ParseLogEntry.InvalidLogEntryException(
            "Message has no submission_timestamp but could not be parsed as LogEntry JSON");
      }

    }