in streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java [175:253]
public void prepare(Object configurationObject) {
this.streamsConfiguration = StreamsConfigurator.detectConfiguration();
if( configurationObject instanceof TwitterFollowingConfiguration) {
this.config = (TwitterFollowingConfiguration) configurationObject;
}
Objects.requireNonNull(config);
Objects.requireNonNull(config.getOauth());
Objects.requireNonNull(config.getOauth().getConsumerKey());
Objects.requireNonNull(config.getOauth().getConsumerSecret());
Objects.requireNonNull(config.getOauth().getAccessToken());
Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
Objects.requireNonNull(config.getInfo());
Objects.requireNonNull(config.getThreadsPerProvider());
StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration();
try {
client = getTwitterClient();
} catch (InstantiationException e) {
LOGGER.error("InstantiationException", e);
}
Objects.requireNonNull(client);
try {
lock.writeLock().lock();
providerQueue = QueueUtils.constructQueue();
} finally {
lock.writeLock().unlock();
}
Objects.requireNonNull(providerQueue);
// abstract this out
for (String s : config.getInfo()) {
if (s != null) {
String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
// See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
// screen name list
try {
ids.add(Long.parseLong(potentialScreenName));
} catch (Exception ex) {
names.add(potentialScreenName);
}
}
}
Objects.requireNonNull(getConfig().getEndpoint());
executor = MoreExecutors.listeningDecorator(
ExecutorUtils.newFixedThreadPoolWithQueueSize(
config.getThreadsPerProvider().intValue(),
streamsConfiguration.getQueueSize().intValue()
)
);
Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
for (Long id : ids) {
Callable<Object> callable = createTask(id, null);
LOGGER.info("Thread Created: {}", id);
tasks.add(callable);
futures.add(executor.submit(callable));
LOGGER.info("Thread Submitted: {}", id);
}
for (String name : names) {
Callable<Object> callable = createTask(null, name);
LOGGER.info("Thread Created: {}", name);
tasks.add(callable);
futures.add(executor.submit(callable));
LOGGER.info("Thread Submitted: {}", name);
}
}