in pipelines/etl_integration_java/src/main/java/com/google/cloud/dataflow/solutions/transform/TaxiEventProcessor.java [217:228]
public void processElement(@Element String jsonStr, OutputReceiver<String> receiver) {
try {
JsonNode jsonNode = mapper().readTree(jsonStr);
JsonNode sanitized = sanitizeNode(jsonNode, schema);
String sanitizedJsonStr = sanitized.toString();
receiver.output(sanitizedJsonStr);
} catch (JsonProcessingException e) {
// Just pass the string to the next transformation, and let that one report the JSON
// parsing error
receiver.output(jsonStr);
}
}