in ingestion-beam/src/main/java/com/mozilla/telemetry/decoder/ipprivacy/HashClientInfo.java [120:160]
public PCollection<PubsubMessage> expand(PCollection<PubsubMessage> input) {
return input.apply(
MapElements.into(TypeDescriptor.of(PubsubMessage.class)).via((PubsubMessage message) -> {
Map<String, String> attributes = new HashMap<>(message.getAttributeMap());
if (clientIdHashKeyPath == null || clientIpHashKeyPath == null) {
throw new MissingKeyException();
}
String clientId = attributes.get(Attribute.CLIENT_ID);
String clientIp = attributes.get(Attribute.CLIENT_IP);
byte[] clientIdKey;
byte[] clientIpKey;
try {
clientIdKey = getClientIdHashKey();
clientIpKey = getClientIpHashKey();
} catch (IOException e) {
throw new RuntimeException(e);
}
if (Arrays.equals(clientIdKey, clientIpKey)) {
throw new IdenticalKeyException();
}
try {
if (clientId != null && !isHashed(clientId)) {
String hashedClientId = keyedHash(clientId, clientIdKey);
attributes.put(Attribute.CLIENT_ID, hashedClientId);
}
if (clientIp != null && !isHashed(clientIp)) {
String hashedClientIp = keyedHash(clientIp, clientIpKey);
attributes.put(Attribute.CLIENT_IP, hashedClientIp);
}
} catch (InvalidKeyException | NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
return new PubsubMessage(message.getPayload(), attributes);
}));
}