public List process()

in streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java [62:118]


  public List<StreamsDatum> process(StreamsDatum entry) {

    if ( mapper == null ) {
      mapper = StreamsJacksonMapper.getInstance();
    }

    List<StreamsDatum> result = new ArrayList<>();

    Map<String, Object> metadata = entry.getMetadata();
    if ( metadata == null ) {
      metadata = new HashMap<>();
    }

    String id = null;
    String type = null;

    Object document = entry.getDocument();
    ObjectNode objectNode = null;
    if ( document instanceof String) {
      try {
        objectNode = mapper.readValue((String) document, ObjectNode.class);
      } catch (IOException ex) {
        LOGGER.warn("Can't deserialize to determine metadata", ex);
      }
    } else {
      try {
        objectNode = mapper.convertValue(document, ObjectNode.class);
      } catch (Exception ex) {
        LOGGER.warn("Can't deserialize to determine metadata", ex);
      }
    }
    if ( objectNode != null ) {
      if (objectNode.has("id")) {
        id = objectNode.get("id").textValue();
      }
      if (objectNode.has("verb")) {
        type = objectNode.get("verb").textValue();
      }
      if (objectNode.has("objectType")) {
        type = objectNode.get("objectType").textValue();
      }
    }

    if (StringUtils.isNotEmpty(id) ) {
      metadata.put("id", id);
    }
    if (StringUtils.isNotEmpty(type) ) {
      metadata.put("type", type);
    }

    entry.setId(id);
    entry.setMetadata(metadata);

    result.add(entry);

    return result;
  }