in streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java [123:189]
public List<StreamsDatum> process(StreamsDatum entry) {
List<StreamsDatum> result = new ArrayList<>();
String json;
ObjectNode node;
// first check for valid json
if (entry.getDocument() instanceof String) {
json = (String) entry.getDocument();
try {
node = (ObjectNode) mapper.readTree(json);
} catch (IOException ex) {
ex.printStackTrace();
return null;
}
} else if (entry.getDocument() instanceof ObjectNode) {
node = (ObjectNode) entry.getDocument();
try {
json = mapper.writeValueAsString(node);
} catch (JsonProcessingException ex) {
LOGGER.warn("Invalid datum: ", node);
return null;
}
} else {
LOGGER.warn("Incompatible document type: ", entry.getDocument().getClass());
return null;
}
StringBuilder percolateRequestJson = new StringBuilder();
percolateRequestJson.append("{ \"doc\": ");
percolateRequestJson.append(json);
//percolateRequestJson.append("{ \"content\" : \"crazy good shit\" }");
percolateRequestJson.append("}");
PercolateRequestBuilder request;
PercolateResponse response;
try {
LOGGER.trace("Percolate request json: {}", percolateRequestJson.toString());
request = manager.client().preparePercolate().setIndices(config.getIndex()).setDocumentType(config.getType()).setSource(percolateRequestJson.toString());
LOGGER.trace("Percolate request: {}", mapper.writeValueAsString(request.request()));
response = request.execute().actionGet();
LOGGER.trace("Percolate response: {} matches", response.getMatches().length);
} catch (Exception ex) {
LOGGER.warn("Percolate exception: {}", ex.getMessage());
return null;
}
ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
for (PercolateResponse.Match aResponse : response) {
tagArray.add(aResponse.getId().string());
}
LOGGER.trace("Percolate matches: {}", tagArray);
Activity activity = mapper.convertValue(node, Activity.class);
appendMatches(tagArray, activity);
entry.setDocument(activity);
result.add(entry);
return result;
}