public PubsubMessage apply()

in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/GeoCityLookup.java [163:245]


    public PubsubMessage apply(PubsubMessage message) {
      message = PubsubConstraints.ensureNonNull(message);
      try {
        if (geoIP2City == null) {
          // Throws IOException
          loadResourcesOnFirstMessage();
        }

        if (message.getAttributeMap().containsKey(Attribute.GEO_COUNTRY)) {
          // Return early since geo has already been applied;
          // we are likely reprocessing a message from error output.
          countGeoAlreadyApplied.inc();
          return message;
        }

        // copy attributes
        Map<String, String> attributes = new HashMap<String, String>(message.getAttributeMap());

        // Determine client ip
        String xff = attributes.get(Attribute.X_FORWARDED_FOR);
        if (xff != null) {
          // In practice, many of the "first" addresses are bogus or internal,
          // so we target the immediate sending client IP by choosing the last entry.
          String[] ips = xff.split("\\s*,\\s*");
          String ip = ips[Math.max(ips.length - 1, 0)];

          try {
            attributes.put(Attribute.GEO_DB_VERSION, DateTimeFormatter.ISO_INSTANT
                .format(Instant.ofEpochMilli(geoIP2City.getMetadata().getBuildDate().getTime())));

            // Throws UnknownHostException
            InetAddress ipAddress = InetAddress.getByName(ip);
            foundIp.inc();

            // Throws GeoIp2Exception, MalformedInputException, and IOException
            CityResponse response = geoIP2City.city(ipAddress);
            foundCity.inc();

            String countryCode = response.getCountry().getIsoCode();
            attributes.put(Attribute.GEO_COUNTRY, countryCode);

            Integer dmaCode = response.getLocation().getMetroCode();
            if (dmaCode != null) {
              foundDmaCode.inc();
            }

            City city = response.getCity();
            if (cityAllowed(city.getGeoNameId())) {
              attributes.put(Attribute.GEO_CITY, city.getName());
              foundCityAllowed.inc();

              if (dmaCode != null) {
                attributes.put(Attribute.GEO_DMA_CODE, dmaCode.toString());
                foundDmaCodeAllowed.inc();
              }
            }

            List<Subdivision> subdivisions = response.getSubdivisions();
            // Throws IndexOutOfBoundsException
            attributes.put(Attribute.GEO_SUBDIVISION1, subdivisions.get(0).getIsoCode());
            foundGeo1.inc();
            attributes.put(Attribute.GEO_SUBDIVISION2, subdivisions.get(1).getIsoCode());
            foundGeo2.inc();

          } catch (MalformedInputException ignore) {
            malformedInput.inc();
          } catch (UnknownHostException | GeoIp2Exception | IndexOutOfBoundsException ignore) {
            // ignore these exceptions
          }
        }

        // remove client ip from attributes
        attributes.remove(Attribute.X_FORWARDED_FOR);

        // remove null attributes because the coder can't handle them
        attributes.values().removeIf(Objects::isNull);

        return new PubsubMessage(message.getPayload(), attributes);
      } catch (IOException e) {
        // Re-throw unchecked, so that the pipeline will fail at run time if it occurs
        throw new UncheckedIOException(e);
      }
    }