public void processElement()

in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/ParseProxy.java [50:104]


    public void processElement(@Element PubsubMessage rawMessage,
        OutputReceiver<PubsubMessage> out) {
      // Prevent null pointer exception
      final PubsubMessage message = PubsubConstraints.ensureNonNull(rawMessage);

      // Copy attributes
      final Map<String, String> attributes = new HashMap<>(message.getAttributeMap());

      Optional.ofNullable(attributes.get(Attribute.X_FORWARDED_FOR))
          .map(v -> Arrays.stream(v.split("\\s*,\\s*")).filter(StringUtils::isNotBlank)
              .collect(Collectors.toList()))
          .ifPresent(xff -> {
            // Google's load balancer will append the immediate sending client IP and a global
            // forwarding rule IP to any existing content in X-Forwarded-For as documented in:
            // https://cloud.google.com/load-balancing/docs/https/#components
            //
            // Our nginx setup will then append google's load balancer IP.
            //
            // In practice, many of the "first" addresses are bogus or internal, so remove load
            // balancer entries and downstream transforms will read the immediate sending client
            // IP from the end of the list.
            if (xff.size() > 1) {
              // Remove load balancer IP
              xff.remove(xff.size() - 1);
              // Remove global forwarding rule IP
              xff.remove(xff.size() - 1);
            }

            if (schemasLocation != null) {
              // Remove trailing entries as indicated by metadata
              try {
                final Integer geoipSkipEntries = metadataStore.getSchema(attributes)
                    .geoip_skip_entries();
                if (geoipSkipEntries != null) {
                  for (int i = 0; i < geoipSkipEntries && xff.size() > 0; i++) {
                    xff.remove(xff.size() - 1);
                  }
                }
              } catch (SchemaNotFoundException ignore) {
                // this function is not allowed to fail, so ignore the lack of schema
              }
            }

            attributes.put(Attribute.X_FORWARDED_FOR, String.join(",", xff));
          });

      // Remove unused ip from attributes
      attributes.remove(Attribute.REMOTE_ADDR);

      // Remove null attributes because the coder can't handle them.
      attributes.values().removeIf(Objects::isNull);

      // Return new message.
      out.output(new PubsubMessage(message.getPayload(), attributes));
    }