public PCollection expand()

in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/ipprivacy/HashClientInfo.java [120:160]


  public PCollection<PubsubMessage> expand(PCollection<PubsubMessage> input) {
    return input.apply(
        MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via((PubsubMessage message) -> {
          Map<String, String> attributes = new HashMap<>(message.getAttributeMap());

          if (clientIdHashKeyPath == null || clientIpHashKeyPath == null) {
            throw new MissingKeyException();
          }

          String clientId = attributes.get(Attribute.CLIENT_ID);
          String clientIp = attributes.get(Attribute.CLIENT_IP);

          byte[] clientIdKey;
          byte[] clientIpKey;

          try {
            clientIdKey = getClientIdHashKey();
            clientIpKey = getClientIpHashKey();
          } catch (IOException e) {
            throw new RuntimeException(e);
          }

          if (Arrays.equals(clientIdKey, clientIpKey)) {
            throw new IdenticalKeyException();
          }
          try {
            if (clientId != null && !isHashed(clientId)) {
              String hashedClientId = keyedHash(clientId, clientIdKey);
              attributes.put(Attribute.CLIENT_ID, hashedClientId);
            }
            if (clientIp != null && !isHashed(clientIp)) {
              String hashedClientIp = keyedHash(clientIp, clientIpKey);
              attributes.put(Attribute.CLIENT_IP, hashedClientIp);
            }
          } catch (InvalidKeyException | NoSuchAlgorithmException e) {
            throw new RuntimeException(e);
          }

          return new PubsubMessage(message.getPayload(), attributes);
        }));
  }