public StreamsResultSet readAll()

in streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistReader.java [81:169]


  public StreamsResultSet readAll() {

    Queue<StreamsDatum> readAllQueue = constructQueue();

    URIBuilder lk = null;

    try {

      lk = new URIBuilder(client.baseURI.toString());
      lk.setPath(client.baseURI.getPath().concat("/buckets/"+configuration.getDefaultBucket()+"/keys"));
      lk.setParameter("keys", "true");

    } catch (URISyntaxException e) {
      LOGGER.warn("URISyntaxException", e);
    }

    HttpResponse lkResponse = null;
    try {
      HttpGet lkGet = new HttpGet(lk.build());
      lkResponse = client.client().execute(lkGet);
    } catch (IOException e) {
      LOGGER.warn("IOException", e);
      return null;
    } catch (URISyntaxException e) {
      LOGGER.warn("URISyntaxException", e);
      return null;
    }

    String lkEntityString = null;
    try {
      lkEntityString = EntityUtils.toString(lkResponse.getEntity());
    } catch (IOException e) {
      LOGGER.warn("IOException", e);
      return null;
    }

    JsonNode lkEntityNode = null;
    try {
      lkEntityNode = MAPPER.readValue(lkEntityString, JsonNode.class);
    } catch (IOException e) {
      LOGGER.warn("IOException", e);
      return null;
    }

    ArrayNode keysArray = null;
    keysArray = (ArrayNode) lkEntityNode.get("keys");
    Iterator<JsonNode> keysIterator = keysArray.iterator();

    while( keysIterator.hasNext()) {
      JsonNode keyNode = keysIterator.next();
      String key = keyNode.asText();

      URIBuilder gk = null;

      try {

        gk = new URIBuilder(client.baseURI.toString());
        gk.setPath(client.baseURI.getPath().concat("/buckets/"+configuration.getDefaultBucket()+"/keys/"+key));

      } catch (URISyntaxException e) {
        LOGGER.warn("URISyntaxException", e);
        continue;
      }

      HttpResponse gkResponse = null;
      try {
        HttpGet gkGet = new HttpGet(gk.build());
        gkResponse = client.client().execute(gkGet);
      } catch (IOException e) {
        LOGGER.warn("IOException", e);
        continue;
      } catch (URISyntaxException e) {
        LOGGER.warn("URISyntaxException", e);
        continue;
      }

      String gkEntityString = null;
      try {
        gkEntityString = EntityUtils.toString(gkResponse.getEntity());
      } catch (IOException e) {
        LOGGER.warn("IOException", e);
        continue;
      }

      readAllQueue.add(new StreamsDatum(gkEntityString, key));
    }

    return new StreamsResultSet(readAllQueue);
  }