in streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java [62:118]
public List<StreamsDatum> process(StreamsDatum entry) {
if ( mapper == null ) {
mapper = StreamsJacksonMapper.getInstance();
}
List<StreamsDatum> result = new ArrayList<>();
Map<String, Object> metadata = entry.getMetadata();
if ( metadata == null ) {
metadata = new HashMap<>();
}
String id = null;
String type = null;
Object document = entry.getDocument();
ObjectNode objectNode = null;
if ( document instanceof String) {
try {
objectNode = mapper.readValue((String) document, ObjectNode.class);
} catch (IOException ex) {
LOGGER.warn("Can't deserialize to determine metadata", ex);
}
} else {
try {
objectNode = mapper.convertValue(document, ObjectNode.class);
} catch (Exception ex) {
LOGGER.warn("Can't deserialize to determine metadata", ex);
}
}
if ( objectNode != null ) {
if (objectNode.has("id")) {
id = objectNode.get("id").textValue();
}
if (objectNode.has("verb")) {
type = objectNode.get("verb").textValue();
}
if (objectNode.has("objectType")) {
type = objectNode.get("objectType").textValue();
}
}
if (StringUtils.isNotEmpty(id) ) {
metadata.put("id", id);
}
if (StringUtils.isNotEmpty(type) ) {
metadata.put("type", type);
}
entry.setId(id);
entry.setMetadata(metadata);
result.add(entry);
return result;
}