in src/main/java/com/google/solutions/df/log/aggregations/common/JsonToRowValidationTransform.java [80:102]
public void processElement(ProcessContext c) {
String input = c.element();
LOG.debug("log: {}", input);
try {
JsonObject convertedObject = gson.fromJson(input, JsonObject.class);
boolean validData =
InetAddressValidator.getInstance()
.isValidInet4Address(convertedObject.get("dstIP").getAsString())
&& InetAddressValidator.getInstance()
.isValidInet4Address(convertedObject.get("srcIP").getAsString());
if (validData) {
c.output(convertedObject.toString());
} else {
String errMsg = String.format("Not a valid IP address %s", input);
LOG.error(errMsg);
c.output(Util.failureTag, errMsg);
}
} catch (JsonSyntaxException e) {
c.output(Util.failureTag, e.getMessage());
}
}