in streams-contrib/streams-provider-google/google-gplus/src/main/java/org/apache/streams/gplus/provider/GPlusUserActivityCollector.java [106:159]
protected void collectActivityData() {
int item_count = 0;
int last_count = 0;
int page_count = 0;
try {
ActivityFeed feed = null;
boolean tryAgain = false;
int attempt = 0;
DateTime afterDate = userInfo.getAfterDate();
DateTime beforeDate = userInfo.getBeforeDate();
do {
try {
if (feed == null) {
feed = this.plus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION)
.setMaxResults(MAX_RESULTS).execute();
} else {
feed = this.plus.activities().list(this.userInfo.getUserId(), PUBLIC_COLLECTION)
.setMaxResults(MAX_RESULTS)
.setPageToken(feed.getNextPageToken()).execute();
}
this.backOff.reset(); //successful pull reset api.
last_count += feed.getItems().size();
for (com.google.api.services.plus.model.Activity activity : feed.getItems()) {
DateTime published = new DateTime(activity.getPublished().getValue());
if ((afterDate == null && beforeDate == null)
|| (beforeDate == null && afterDate.isBefore(published))
|| (afterDate == null && beforeDate.isAfter(published))
|| ((afterDate != null && beforeDate != null) && (afterDate.isBefore(published) && beforeDate.isAfter(published)))) {
String json = MAPPER.writeValueAsString(activity);
this.datumQueue.put(new StreamsDatum(json, activity.getId()));
item_count++;
} else if (afterDate != null && afterDate.isAfter(published)) {
feed.setNextPageToken(null); // do not fetch next page
break;
}
}
page_count += 1;
} catch (GoogleJsonResponseException gjre) {
tryAgain = backoffAndIdentifyIfRetry(gjre, this.backOff);
++attempt;
}
}
while ((tryAgain || (feed != null && feed.getNextPageToken() != null)) && attempt < MAX_ATTEMPTS);
} catch (Throwable th) {
if (th instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
th.printStackTrace();
LOGGER.warn("Unable to pull Activities for user={} : {}",this.userInfo.getUserId(), th);
}
LOGGER.info("item_count: {} last_count: {} page_count: {} ", item_count, last_count, page_count);
}