in streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java [173:221]
protected Set<String> queueFeedEntries(URL feedUrl) throws IOException, FeedException {
// ConcurrentHashSet is preferable, but it's only in guava 15+
// spark 1.5.0 uses guava 14 so for the moment this is the workaround
Set<String> batch = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
URLConnection connection = feedUrl.openConnection();
connection.setConnectTimeout(this.timeOut);
connection.setConnectTimeout(this.timeOut);
SyndFeedInput input = new SyndFeedInput();
SyndFeed feed = input.build(new InputStreamReader(connection.getInputStream()));
for (Object entryObj : feed.getEntries()) {
SyndEntry entry = (SyndEntry) entryObj;
ObjectNode nodeEntry = this.serializer.deserialize(entry);
nodeEntry.put(RSS_KEY, this.rssFeed);
String entryId = determineId(nodeEntry);
batch.add(entryId);
StreamsDatum datum = new StreamsDatum(nodeEntry);
try {
JsonNode published = nodeEntry.get(DATE_KEY);
if (published != null) {
try {
DateTime date = RFC3339Utils.parseToUTC(published.asText());
if (date.isAfter(this.publishedSince) && (!this.perpetual || !seenBefore(entryId, this.rssFeed))) {
this.dataQueue.put(datum);
LOGGER.debug("Added entry, {}, to provider queue.", entryId);
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} catch (Exception ex) {
LOGGER.trace("Failed to parse date from object node, attempting to add node to queue by default.");
if (!this.perpetual || !seenBefore(entryId, this.rssFeed)) {
this.dataQueue.put(datum);
LOGGER.debug("Added entry, {}, to provider queue.", entryId);
}
}
} else {
LOGGER.debug("No published date present, attempting to add node to queue by default.");
if (!this.perpetual || !seenBefore(entryId, this.rssFeed)) {
this.dataQueue.put(datum);
LOGGER.debug("Added entry, {}, to provider queue.", entryId);
}
}
} catch (InterruptedException ie) {
LOGGER.error("Interupted Exception.");
Thread.currentThread().interrupt();
}
}
return batch;
}