public List process()

in streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java [123:189]


  public List<StreamsDatum> process(StreamsDatum entry) {

    List<StreamsDatum> result = new ArrayList<>();

    String json;
    ObjectNode node;
    // first check for valid json
    if (entry.getDocument() instanceof String) {
      json = (String) entry.getDocument();
      try {
        node = (ObjectNode) mapper.readTree(json);
      } catch (IOException ex) {
        ex.printStackTrace();
        return null;
      }
    } else if (entry.getDocument() instanceof ObjectNode) {
      node = (ObjectNode) entry.getDocument();
      try {
        json = mapper.writeValueAsString(node);
      } catch (JsonProcessingException ex) {
        LOGGER.warn("Invalid datum: ", node);
        return null;
      }
    } else {
      LOGGER.warn("Incompatible document type: ", entry.getDocument().getClass());
      return null;
    }

    StringBuilder percolateRequestJson = new StringBuilder();
    percolateRequestJson.append("{ \"doc\": ");
    percolateRequestJson.append(json);
    //percolateRequestJson.append("{ \"content\" : \"crazy good shit\" }");
    percolateRequestJson.append("}");

    PercolateRequestBuilder request;
    PercolateResponse response;

    try {
      LOGGER.trace("Percolate request json: {}", percolateRequestJson.toString());
      request = manager.client().preparePercolate().setIndices(config.getIndex()).setDocumentType(config.getType()).setSource(percolateRequestJson.toString());
      LOGGER.trace("Percolate request: {}", mapper.writeValueAsString(request.request()));
      response = request.execute().actionGet();
      LOGGER.trace("Percolate response: {} matches", response.getMatches().length);
    } catch (Exception ex) {
      LOGGER.warn("Percolate exception: {}", ex.getMessage());
      return null;
    }

    ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();

    for (PercolateResponse.Match aResponse : response) {
      tagArray.add(aResponse.getId().string());
    }

    LOGGER.trace("Percolate matches: {}", tagArray);

    Activity activity = mapper.convertValue(node, Activity.class);

    appendMatches(tagArray, activity);

    entry.setDocument(activity);

    result.add(entry);

    return result;

  }