public List process()

in streams-contrib/streams-processor-json/src/main/java/org/apache/streams/json/JsonPathExtractor.java [69:142]


  public List<StreamsDatum> process(StreamsDatum entry) {

    List<StreamsDatum> result = new ArrayList<>();

    String json = null;

    LOGGER.debug("{} processing {}", STREAMS_ID);

    if ( entry.getDocument() instanceof ObjectNode ) {
      ObjectNode node = (ObjectNode) entry.getDocument();
      try {
        json = mapper.writeValueAsString(node);
      } catch (JsonProcessingException ex) {
        LOGGER.warn(ex.getMessage());
      }
    } else if ( entry.getDocument() instanceof String ) {
      json = (String) entry.getDocument();
    }

    if ( StringUtils.isNotEmpty(json)) {

      try {
        Object readResult = jsonPath.read(json);

        if (readResult instanceof String) {
          String match = (String) readResult;
          LOGGER.info("Matched String: " + match);
          StreamsDatum matchDatum = new StreamsDatum(match);
          result.add(matchDatum);
        } else if (readResult instanceof JSONObject) {
          JSONObject match = (JSONObject) readResult;
          LOGGER.info("Matched Object: " + match);
          ObjectNode objectNode = mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class);
          StreamsDatum matchDatum = new StreamsDatum(objectNode);
          result.add(matchDatum);
        } else if (readResult instanceof JSONArray) {
          LOGGER.info("Matched Array:");
          JSONArray array = (JSONArray) readResult;
          for (Object item : array) {
            if (item instanceof String) {
              LOGGER.info("String Item:" + item);
              String match = (String) item;
              StreamsDatum matchDatum = new StreamsDatum(match);
              result.add(matchDatum);
            } else if (item instanceof JSONObject) {
              LOGGER.info("Object Item:" + item);
              JSONObject match = (JSONObject) item;
              ObjectNode objectNode = mapper.readValue(mapper.writeValueAsString(match), ObjectNode.class);
              StreamsDatum matchDatum = new StreamsDatum(objectNode);
              result.add(matchDatum);
            } else if (item instanceof LinkedHashMap) {
              LOGGER.info("Object Item:" + item);
              ObjectNode objectNode = mapper.readValue(mapper.writeValueAsString(item), ObjectNode.class);
              StreamsDatum matchDatum = new StreamsDatum(objectNode);
              result.add(matchDatum);
            } else {
              LOGGER.info("Other Item:" + item.toString());
            }
          }
        } else {
          LOGGER.info("Other Match:" + readResult.toString());
        }

      } catch ( Exception ex ) {
        LOGGER.warn(ex.getMessage());
      }

    } else {
      LOGGER.warn("result empty");
    }

    return result;

  }