in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/ExtractIpFromLogEntry.java [41:87]
public PubsubMessage apply(PubsubMessage message) {
if (message.getAttribute(Attribute.SUBMISSION_TIMESTAMP) != null) {
// If this message has submission_timestamp set, it is a normal message from the edge
// server rather than a LogEntry from Cloud Logging, so we return immediately.
return message;
}
if (message.getAttributeMap().containsKey(Attribute.X_FORWARDED_FOR)) {
// Return early since IP has been extracted
countIpAlreadyExtracted.inc();
return message;
}
try {
ObjectNode logEntry;
final Map<String, String> attributes = new HashMap<>(message.getAttributeMap());
logEntry = Json.readObjectNode(message.getPayload());
JsonNode maybeJsonPayloadNode = logEntry.path("jsonPayload");
if (maybeJsonPayloadNode.isObject()) {
ObjectNode jsonPayload = (ObjectNode) maybeJsonPayloadNode;
JsonNode maybeFieldsNode = jsonPayload.path("Fields");
if (maybeFieldsNode.isObject()) {
ObjectNode fields = (ObjectNode) maybeFieldsNode;
JsonNode ipAddress = fields.remove("ip_address");
if (ipAddress != null) {
// if ipAddress is null, it means it wasn't present in the payload
String clientIpAddress = ipAddress.textValue();
attributes.put(Attribute.X_FORWARDED_FOR, clientIpAddress);
countIpExtracted.inc();
jsonPayload.replace("Fields", fields);
logEntry.replace("jsonPayload", jsonPayload);
byte[] sanitizedPayload = logEntry.toString().getBytes(StandardCharsets.UTF_8);
return new PubsubMessage(sanitizedPayload, attributes);
}
}
}
return message;
} catch (IOException e) {
throw new ParseLogEntry.InvalidLogEntryException(
"Message has no submission_timestamp but could not be parsed as LogEntry JSON");
}
}