public Cancellable consume()

in twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java [218:264]


    public Cancellable consume(MessageCallback callback) {
      final ExecutorService executor = Executors.newSingleThreadExecutor(threadFactory);
      final List<ConsumerThread> pollers = Lists.newArrayList();

      // When cancelling the consumption, first terminates all polling threads and then stop the executor service.
      final AtomicBoolean cancelled = new AtomicBoolean();
      Cancellable cancellable = new Cancellable() {
        @Override
        public void cancel() {
          if (!cancelled.compareAndSet(false, true)) {
            return;
          }
          consumerCancels.remove(this);

          LOG.info("Requesting stop of all consumer threads.");
          for (ConsumerThread consumerThread : pollers) {
            consumerThread.terminate();
          }
          LOG.info("Wait for all consumer threads to stop.");
          for (ConsumerThread consumerThread : pollers) {
            try {
              consumerThread.join();
            } catch (InterruptedException e) {
              LOG.warn("Interrupted exception while waiting for thread to complete.", e);
            }
          }
          LOG.info("All consumer threads stopped.");
          // Use shutdown so that submitted task still has chance to execute, which is important for finished to get
          // called.
          executor.shutdown();
        }
      };

      // Wrap the callback with a single thread executor.
      MessageCallback messageCallback = wrapCallback(callback, executor, cancellable);

      // Starts threads for polling new messages.
      for (Map.Entry<TopicPartition, Long> entry : requests.entrySet()) {
        ConsumerThread consumerThread = new ConsumerThread(entry.getKey(), entry.getValue(), messageCallback);
        consumerThread.setDaemon(true);
        consumerThread.start();
        pollers.add(consumerThread);
      }

      consumerCancels.add(cancellable);
      return cancellable;
    }