in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/GeoCityLookup.java [163:245]
public PubsubMessage apply(PubsubMessage message) {
message = PubsubConstraints.ensureNonNull(message);
try {
if (geoIP2City == null) {
// Throws IOException
loadResourcesOnFirstMessage();
}
if (message.getAttributeMap().containsKey(Attribute.GEO_COUNTRY)) {
// Return early since geo has already been applied;
// we are likely reprocessing a message from error output.
countGeoAlreadyApplied.inc();
return message;
}
// copy attributes
Map<String, String> attributes = new HashMap<String, String>(message.getAttributeMap());
// Determine client ip
String xff = attributes.get(Attribute.X_FORWARDED_FOR);
if (xff != null) {
// In practice, many of the "first" addresses are bogus or internal,
// so we target the immediate sending client IP by choosing the last entry.
String[] ips = xff.split("\\s*,\\s*");
String ip = ips[Math.max(ips.length - 1, 0)];
try {
attributes.put(Attribute.GEO_DB_VERSION, DateTimeFormatter.ISO_INSTANT
.format(Instant.ofEpochMilli(geoIP2City.getMetadata().getBuildDate().getTime())));
// Throws UnknownHostException
InetAddress ipAddress = InetAddress.getByName(ip);
foundIp.inc();
// Throws GeoIp2Exception, MalformedInputException, and IOException
CityResponse response = geoIP2City.city(ipAddress);
foundCity.inc();
String countryCode = response.getCountry().getIsoCode();
attributes.put(Attribute.GEO_COUNTRY, countryCode);
Integer dmaCode = response.getLocation().getMetroCode();
if (dmaCode != null) {
foundDmaCode.inc();
}
City city = response.getCity();
if (cityAllowed(city.getGeoNameId())) {
attributes.put(Attribute.GEO_CITY, city.getName());
foundCityAllowed.inc();
if (dmaCode != null) {
attributes.put(Attribute.GEO_DMA_CODE, dmaCode.toString());
foundDmaCodeAllowed.inc();
}
}
List<Subdivision> subdivisions = response.getSubdivisions();
// Throws IndexOutOfBoundsException
attributes.put(Attribute.GEO_SUBDIVISION1, subdivisions.get(0).getIsoCode());
foundGeo1.inc();
attributes.put(Attribute.GEO_SUBDIVISION2, subdivisions.get(1).getIsoCode());
foundGeo2.inc();
} catch (MalformedInputException ignore) {
malformedInput.inc();
} catch (UnknownHostException | GeoIp2Exception | IndexOutOfBoundsException ignore) {
// ignore these exceptions
}
}
// remove client ip from attributes
attributes.remove(Attribute.X_FORWARDED_FOR);
// remove null attributes because the coder can't handle them
attributes.values().removeIf(Objects::isNull);
return new PubsubMessage(message.getPayload(), attributes);
} catch (IOException e) {
// Re-throw unchecked, so that the pipeline will fail at run time if it occurs
throw new UncheckedIOException(e);
}
}