public PubsubMessage apply()

in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/GeoIspLookup.java [84:136]


    public PubsubMessage apply(PubsubMessage message) {
      message = PubsubConstraints.ensureNonNull(message);

      try {
        if (ispReader == null) {
          loadResourcesOnFirstMessage();
        }

        if (message.getAttributeMap().containsKey(Attribute.ISP_NAME)) {
          // Return early since ISP lookup has already been performed.
          countIspAlreadyApplied.inc();
          return message;
        }

        // copy attributes
        Map<String, String> attributes = new HashMap<String, String>(message.getAttributeMap());

        // Determine client ip
        String xff = attributes.get(Attribute.X_FORWARDED_FOR);

        if (xff != null) {
          // In practice, many of the "first" addresses are bogus or internal,
          // so we target the immediate sending client IP by choosing the last entry.
          String[] ips = xff.split("\\s*,\\s*");
          String ip = ips[Math.max(ips.length - 1, 0)];

          try {
            attributes.put(Attribute.ISP_DB_VERSION, DateTimeFormatter.ISO_INSTANT
                .format(Instant.ofEpochMilli(ispReader.getMetadata().getBuildDate().getTime())));

            // Throws UnknownHostException
            InetAddress ipAddress = InetAddress.getByName(ip);
            foundIp.inc();

            IspResponse response = ispReader.isp(ipAddress);
            foundIsp.inc();

            attributes.put(Attribute.ISP_NAME, response.getIsp());
            attributes.put(Attribute.ISP_ORGANIZATION, response.getOrganization());
          } catch (UnknownHostException | GeoIp2Exception ignore) {
            // ignore these exceptions
          }
        }

        // remove null attributes because the coder can't handle them
        attributes.values().removeIf(Objects::isNull);

        return new PubsubMessage(message.getPayload(), attributes);
      } catch (IOException e) {
        // Re-throw unchecked, so that the pipeline will fail at run time if it occurs
        throw new UncheckedIOException(e);
      }
    }