public void processElement()

in src/main/java/com/google/solutions/df/log/aggregations/common/IpToGeoDoFn.java [74:96]


  public void processElement(ProcessContext c)
      throws UnknownHostException, IOException, GeoIp2Exception {
    String srcIP = c.element().getString("srcIP");
    Optional<CityResponse> response = reader.tryCity(InetAddress.getByName(srcIP));
    if (response.isPresent()) {
      Row defaultRowWithGeo = c.element();
      String countryName = response.get().getCountry().getName();
      String cityName = response.get().getCity().getName();
      Double latitude = response.get().getLocation().getLatitude();
      Double longitude = response.get().getLocation().getLongitude();

      
      c.output(Row.fromRow(defaultRowWithGeo)
    		  .withFieldValue("geoCountry", countryName)
    		  .withFieldValue("geoCity", cityName)
    		  .withFieldValue("latitude", latitude)
    		  .withFieldValue("longitude", longitude)
    		  .build());

    } else {
      c.output(c.element());
    }
  }