public void prepare()

in streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java [175:253]


  public void prepare(Object configurationObject) {

    this.streamsConfiguration = StreamsConfigurator.detectConfiguration();

    if( configurationObject instanceof TwitterFollowingConfiguration) {
      this.config = (TwitterFollowingConfiguration) configurationObject;
    }

    Objects.requireNonNull(config);
    Objects.requireNonNull(config.getOauth());
    Objects.requireNonNull(config.getOauth().getConsumerKey());
    Objects.requireNonNull(config.getOauth().getConsumerSecret());
    Objects.requireNonNull(config.getOauth().getAccessToken());
    Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
    Objects.requireNonNull(config.getInfo());
    Objects.requireNonNull(config.getThreadsPerProvider());

    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration();

    try {
      client = getTwitterClient();
    } catch (InstantiationException e) {
      LOGGER.error("InstantiationException", e);
    }

    Objects.requireNonNull(client);

    try {
      lock.writeLock().lock();
      providerQueue = QueueUtils.constructQueue();
    } finally {
      lock.writeLock().unlock();
    }

    Objects.requireNonNull(providerQueue);

    // abstract this out
    for (String s : config.getInfo()) {
      if (s != null) {
        String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();

        // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
        // screen name list
        try {
          ids.add(Long.parseLong(potentialScreenName));
        } catch (Exception ex) {
          names.add(potentialScreenName);
        }
      }
    }

    Objects.requireNonNull(getConfig().getEndpoint());

    executor = MoreExecutors.listeningDecorator(
      ExecutorUtils.newFixedThreadPoolWithQueueSize(
            config.getThreadsPerProvider().intValue(),
            streamsConfiguration.getQueueSize().intValue()
        )
    );

    Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));

    for (Long id : ids) {
      Callable<Object> callable = createTask(id, null);
      LOGGER.info("Thread Created: {}", id);
      tasks.add(callable);
      futures.add(executor.submit(callable));
      LOGGER.info("Thread Submitted: {}", id);
    }

    for (String name : names) {
      Callable<Object> callable = createTask(null, name);
      LOGGER.info("Thread Created: {}", name);
      tasks.add(callable);
      futures.add(executor.submit(callable));
      LOGGER.info("Thread Submitted: {}", name);
    }

  }