in streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathFilter.java [75:157]
public List<StreamsDatum> process(StreamsDatum entry) {
List<StreamsDatum> result = new ArrayList<>();
String json = null;
ObjectNode document = null;
LOGGER.debug("{} processing {}", STREAMS_ID);
if ( entry.getDocument() instanceof ObjectNode ) {
document = (ObjectNode) entry.getDocument();
try {
json = mapper.writeValueAsString(document);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
} else if ( entry.getDocument() instanceof String ) {
json = (String) entry.getDocument();
try {
document = mapper.readValue(json, ObjectNode.class);
} catch (IOException ex) {
ex.printStackTrace();
return null;
}
}
Objects.requireNonNull(document);
if ( StringUtils.isNotEmpty(json)) {
Object srcResult = null;
try {
srcResult = jsonPath.read(json);
} catch ( Exception ex ) {
ex.printStackTrace();
LOGGER.warn(ex.getMessage());
}
Objects.requireNonNull(srcResult);
String[] path = StringUtils.split(pathExpression, '.');
ObjectNode node = document;
for (int i = 1; i < path.length - 1; i++) {
node = (ObjectNode) document.get(path[i]);
}
Objects.requireNonNull(node);
if ( srcResult instanceof JSONArray ) {
try {
ArrayNode jsonNode = mapper.convertValue(srcResult, ArrayNode.class);
if ( jsonNode.size() == 1 ) {
JsonNode item = jsonNode.get(0);
node.set(destNodeName, item);
} else {
node.set(destNodeName, jsonNode);
}
} catch (Exception ex) {
LOGGER.warn(ex.getMessage());
}
} else if ( srcResult instanceof JSONObject ) {
try {
ObjectNode jsonNode = mapper.convertValue(srcResult, ObjectNode.class);
node.set(destNodeName, jsonNode);
} catch (Exception ex) {
LOGGER.warn(ex.getMessage());
}
} else if ( srcResult instanceof String ) {
try {
node.put(destNodeName, (String) srcResult);
} catch (Exception ex) {
LOGGER.warn(ex.getMessage());
}
}
}
result.add(new StreamsDatum(document));
return result;
}