protected void collectActivityData()

in streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserActivityCollector.java [106:159]


  protected void collectActivityData() {
    int item_count = 0;
    int last_count = 0;
    int page_count = 0;
    try {
      ActivityFeed feed = null;
      boolean tryAgain = false;
      int attempt = 0;
      DateTime afterDate = userInfo.getAfterDate();
      DateTime beforeDate = userInfo.getBeforeDate();
      do {
        try {
          if (feed == null) {
            feed = this.plus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION)
                .setMaxResults(MAX_RESULTS).execute();
          } else {
            feed = this.plus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION)
                .setMaxResults(MAX_RESULTS)
                .setPageToken(feed.getNextPageToken()).execute();
          }
          this.backOff.reset(); //successful pull reset api.
          last_count += feed.getItems().size();
          for (com.google.api.services.plus.model.Activity activity : feed.getItems()) {
            DateTime published = new DateTime(activity.getPublished().getValue());
            if ((afterDate == null && beforeDate == null)
                || (beforeDate == null && afterDate.isBefore(published))
                || (afterDate == null && beforeDate.isAfter(published))
                || ((afterDate != null && beforeDate != null) && (afterDate.isBefore(published) && beforeDate.isAfter(published)))) {
              String json = MAPPER.writeValueAsString(activity);
              this.datumQueue.put(new StreamsDatum(json, activity.getId()));
              item_count++;
            } else if (afterDate != null && afterDate.isAfter(published)) {
              feed.setNextPageToken(null); // do not fetch next page
              break;
            }
          }
          page_count += 1;
        } catch (GoogleJsonResponseException gjre) {
          tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff);
          ++attempt;
        }
      }
      while ((tryAgain || (feed != null && feed.getNextPageToken() != null)) && attempt < MAX_ATTEMPTS);
    } catch (Throwable th) {
      if (th instanceof InterruptedException) {
        Thread.currentThread().interrupt();
      }
      th.printStackTrace();
      LOGGER.warn("Unable to pull Activities for user={} : {}",this.userInfo.getUserId(), th);
    }

    LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, last_count, page_count);

  }