in streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java [193:255]
public void prepare(Object configurationObject) {
if( configurationObject instanceof TwitterTimelineProviderConfiguration ) {
this.config = (TwitterTimelineProviderConfiguration)configurationObject;
}
streamsConfiguration = StreamsConfigurator.detectConfiguration(StreamsConfigurator.getConfig());
try {
lock.writeLock().lock();
providerQueue = QueueUtils.constructQueue();
} finally {
lock.writeLock().unlock();
}
Objects.requireNonNull(providerQueue);
Objects.requireNonNull(config);
Objects.requireNonNull(config.getOauth().getConsumerKey());
Objects.requireNonNull(config.getOauth().getConsumerSecret());
Objects.requireNonNull(config.getOauth().getAccessToken());
Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
Objects.requireNonNull(config.getInfo());
Objects.requireNonNull(config.getThreadsPerProvider());
StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration();
try {
client = getTwitterClient();
} catch (InstantiationException e) {
LOGGER.error("InstantiationException", e);
}
Objects.requireNonNull(client);
for (String s : config.getInfo()) {
if (s != null) {
String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
// See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
// screen name list
try {
ids.add(Long.parseLong(potentialScreenName));
} catch (Exception ex) {
names.add(potentialScreenName);
}
}
}
executor = MoreExecutors.listeningDecorator(
ExecutorUtils.newFixedThreadPoolWithQueueSize(
config.getThreadsPerProvider().intValue(),
streamsConfiguration.getQueueSize().intValue()
)
);
Objects.requireNonNull(executor);
submitTimelineThreads(ids, names);
LOGGER.info("tasks: {}", tasks.size());
LOGGER.info("futures: {}", futures.size());
}