in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/GeoIspLookup.java [84:136]
public PubsubMessage apply(PubsubMessage message) {
message = PubsubConstraints.ensureNonNull(message);
try {
if (ispReader == null) {
loadResourcesOnFirstMessage();
}
if (message.getAttributeMap().containsKey(Attribute.ISP_NAME)) {
// Return early since ISP lookup has already been performed.
countIspAlreadyApplied.inc();
return message;
}
// copy attributes
Map<String, String> attributes = new HashMap<String, String>(message.getAttributeMap());
// Determine client ip
String xff = attributes.get(Attribute.X_FORWARDED_FOR);
if (xff != null) {
// In practice, many of the "first" addresses are bogus or internal,
// so we target the immediate sending client IP by choosing the last entry.
String[] ips = xff.split("\\s*,\\s*");
String ip = ips[Math.max(ips.length - 1, 0)];
try {
attributes.put(Attribute.ISP_DB_VERSION, DateTimeFormatter.ISO_INSTANT
.format(Instant.ofEpochMilli(ispReader.getMetadata().getBuildDate().getTime())));
// Throws UnknownHostException
InetAddress ipAddress = InetAddress.getByName(ip);
foundIp.inc();
IspResponse response = ispReader.isp(ipAddress);
foundIsp.inc();
attributes.put(Attribute.ISP_NAME, response.getIsp());
attributes.put(Attribute.ISP_ORGANIZATION, response.getOrganization());
} catch (UnknownHostException | GeoIp2Exception ignore) {
// ignore these exceptions
}
}
// remove null attributes because the coder can't handle them
attributes.values().removeIf(Objects::isNull);
return new PubsubMessage(message.getPayload(), attributes);
} catch (IOException e) {
// Re-throw unchecked, so that the pipeline will fail at run time if it occurs
throw new UncheckedIOException(e);
}
}