in streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistReader.java [81:169]
public StreamsResultSet readAll() {
Queue<StreamsDatum> readAllQueue = constructQueue();
URIBuilder lk = null;
try {
lk = new URIBuilder(client.baseURI.toString());
lk.setPath(client.baseURI.getPath().concat("/buckets/"+configuration.getDefaultBucket()+"/keys"));
lk.setParameter("keys", "true");
} catch (URISyntaxException e) {
LOGGER.warn("URISyntaxException", e);
}
HttpResponse lkResponse = null;
try {
HttpGet lkGet = new HttpGet(lk.build());
lkResponse = client.client().execute(lkGet);
} catch (IOException e) {
LOGGER.warn("IOException", e);
return null;
} catch (URISyntaxException e) {
LOGGER.warn("URISyntaxException", e);
return null;
}
String lkEntityString = null;
try {
lkEntityString = EntityUtils.toString(lkResponse.getEntity());
} catch (IOException e) {
LOGGER.warn("IOException", e);
return null;
}
JsonNode lkEntityNode = null;
try {
lkEntityNode = MAPPER.readValue(lkEntityString, JsonNode.class);
} catch (IOException e) {
LOGGER.warn("IOException", e);
return null;
}
ArrayNode keysArray = null;
keysArray = (ArrayNode) lkEntityNode.get("keys");
Iterator<JsonNode> keysIterator = keysArray.iterator();
while( keysIterator.hasNext()) {
JsonNode keyNode = keysIterator.next();
String key = keyNode.asText();
URIBuilder gk = null;
try {
gk = new URIBuilder(client.baseURI.toString());
gk.setPath(client.baseURI.getPath().concat("/buckets/"+configuration.getDefaultBucket()+"/keys/"+key));
} catch (URISyntaxException e) {
LOGGER.warn("URISyntaxException", e);
continue;
}
HttpResponse gkResponse = null;
try {
HttpGet gkGet = new HttpGet(gk.build());
gkResponse = client.client().execute(gkGet);
} catch (IOException e) {
LOGGER.warn("IOException", e);
continue;
} catch (URISyntaxException e) {
LOGGER.warn("URISyntaxException", e);
continue;
}
String gkEntityString = null;
try {
gkEntityString = EntityUtils.toString(gkResponse.getEntity());
} catch (IOException e) {
LOGGER.warn("IOException", e);
continue;
}
readAllQueue.add(new StreamsDatum(gkEntityString, key));
}
return new StreamsResultSet(readAllQueue);
}