protected Set queueFeedEntries()

in streams-contrib/streams-provider-rss/src/main/java/org/apache/streams/rss/provider/RssStreamProviderTask.java [173:221]


  protected Set<String> queueFeedEntries(URL feedUrl) throws IOException, FeedException {

    // ConcurrentHashSet is preferable, but it's only in guava 15+
    // spark 1.5.0 uses guava 14 so for the moment this is the workaround
    Set<String> batch = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
    URLConnection connection = feedUrl.openConnection();
    connection.setConnectTimeout(this.timeOut);
    connection.setConnectTimeout(this.timeOut);
    SyndFeedInput input = new SyndFeedInput();
    SyndFeed feed = input.build(new InputStreamReader(connection.getInputStream()));
    for (Object entryObj : feed.getEntries()) {
      SyndEntry entry = (SyndEntry) entryObj;
      ObjectNode nodeEntry = this.serializer.deserialize(entry);
      nodeEntry.put(RSS_KEY, this.rssFeed);
      String entryId = determineId(nodeEntry);
      batch.add(entryId);
      StreamsDatum datum = new StreamsDatum(nodeEntry);
      try {
        JsonNode published = nodeEntry.get(DATE_KEY);
        if (published != null) {
          try {
            DateTime date = RFC3339Utils.parseToUTC(published.asText());
            if (date.isAfter(this.publishedSince) && (!this.perpetual || !seenBefore(entryId, this.rssFeed))) {
              this.dataQueue.put(datum);
              LOGGER.debug("Added entry, {}, to provider queue.", entryId);
            }
          } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
          } catch (Exception ex) {
            LOGGER.trace("Failed to parse date from object node, attempting to add node to queue by default.");
            if (!this.perpetual || !seenBefore(entryId, this.rssFeed)) {
              this.dataQueue.put(datum);
              LOGGER.debug("Added entry, {}, to provider queue.", entryId);
            }
          }
        } else {
          LOGGER.debug("No published date present, attempting to add node to queue by default.");
          if (!this.perpetual || !seenBefore(entryId, this.rssFeed)) {
            this.dataQueue.put(datum);
            LOGGER.debug("Added entry, {}, to provider queue.", entryId);
          }
        }
      } catch (InterruptedException ie) {
        LOGGER.error("Interupted Exception.");
        Thread.currentThread().interrupt();
      }
    }
    return batch;
  }