in streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/SevenDaySearchProvider.java [183:236]
public void prepare(Object configurationObject) {
if( configurationObject instanceof SevenDaySearchProviderConfiguration ) {
this.config = (SevenDaySearchProviderConfiguration) configurationObject;
}
try {
lock.writeLock().lock();
providerQueue = QueueUtils.constructQueue();
} finally {
lock.writeLock().unlock();
}
Objects.requireNonNull(providerQueue);
Objects.requireNonNull(config.getOauth().getConsumerKey());
Objects.requireNonNull(config.getOauth().getConsumerSecret());
Objects.requireNonNull(config.getOauth().getAccessToken());
Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
Objects.requireNonNull(config.getQ());
Objects.requireNonNull(config.getThreadsPerProvider());
request = new SevenDaySearchRequest();
request.setQ(config.getQ());
request.setGeocode(config.getGeocode());
if( !Objects.isNull(config.getIncludeEntities()) ) {
request.setIncludeEntities(config.getIncludeEntities().toString());
}
request.setLang(config.getLang());
request.setLocale(config.getLocale());
if( !Objects.isNull(config.getResultType())) {
request.setResultType(config.getResultType());
}
streamsConfiguration = StreamsConfigurator.detectConfiguration();
try {
client = getTwitterClient();
} catch (InstantiationException e) {
LOGGER.error("InstantiationException", e);
}
Objects.requireNonNull(client);
executor = MoreExecutors.listeningDecorator(
ExecutorUtils.newFixedThreadPoolWithQueueSize(
config.getThreadsPerProvider().intValue(),
streamsConfiguration.getQueueSize().intValue()
)
);
completionService = new ExecutorCompletionService<>(executor);
submitSearchThread();
}