in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/ParseProxy.java [50:104]
public void processElement(@Element PubsubMessage rawMessage,
OutputReceiver<PubsubMessage> out) {
// Prevent null pointer exception
final PubsubMessage message = PubsubConstraints.ensureNonNull(rawMessage);
// Copy attributes
final Map<String, String> attributes = new HashMap<>(message.getAttributeMap());
Optional.ofNullable(attributes.get(Attribute.X_FORWARDED_FOR))
.map(v -> Arrays.stream(v.split("\\s*,\\s*")).filter(StringUtils::isNotBlank)
.collect(Collectors.toList()))
.ifPresent(xff -> {
// Google's load balancer will append the immediate sending client IP and a global
// forwarding rule IP to any existing content in X-Forwarded-For as documented in:
// https://cloud.google.com/load-balancing/docs/https/#components
//
// Our nginx setup will then append google's load balancer IP.
//
// In practice, many of the "first" addresses are bogus or internal, so remove load
// balancer entries and downstream transforms will read the immediate sending client
// IP from the end of the list.
if (xff.size() > 1) {
// Remove load balancer IP
xff.remove(xff.size() - 1);
// Remove global forwarding rule IP
xff.remove(xff.size() - 1);
}
if (schemasLocation != null) {
// Remove trailing entries as indicated by metadata
try {
final Integer geoipSkipEntries = metadataStore.getSchema(attributes)
.geoip_skip_entries();
if (geoipSkipEntries != null) {
for (int i = 0; i < geoipSkipEntries && xff.size() > 0; i++) {
xff.remove(xff.size() - 1);
}
}
} catch (SchemaNotFoundException ignore) {
// this function is not allowed to fail, so ignore the lack of schema
}
}
attributes.put(Attribute.X_FORWARDED_FOR, String.join(",", xff));
});
// Remove unused ip from attributes
attributes.remove(Attribute.REMOTE_ADDR);
// Remove null attributes because the coder can't handle them.
attributes.values().removeIf(Objects::isNull);
// Return new message.
out.output(new PubsubMessage(message.getPayload(), attributes));
}