in twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java [218:264]
public Cancellable consume(MessageCallback callback) {
final ExecutorService executor = Executors.newSingleThreadExecutor(threadFactory);
final List<ConsumerThread> pollers = Lists.newArrayList();
// When cancelling the consumption, first terminates all polling threads and then stop the executor service.
final AtomicBoolean cancelled = new AtomicBoolean();
Cancellable cancellable = new Cancellable() {
@Override
public void cancel() {
if (!cancelled.compareAndSet(false, true)) {
return;
}
consumerCancels.remove(this);
LOG.info("Requesting stop of all consumer threads.");
for (ConsumerThread consumerThread : pollers) {
consumerThread.terminate();
}
LOG.info("Wait for all consumer threads to stop.");
for (ConsumerThread consumerThread : pollers) {
try {
consumerThread.join();
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting for thread to complete.", e);
}
}
LOG.info("All consumer threads stopped.");
// Use shutdown so that submitted task still has chance to execute, which is important for finished to get
// called.
executor.shutdown();
}
};
// Wrap the callback with a single thread executor.
MessageCallback messageCallback = wrapCallback(callback, executor, cancellable);
// Starts threads for polling new messages.
for (Map.Entry<TopicPartition, Long> entry : requests.entrySet()) {
ConsumerThread consumerThread = new ConsumerThread(entry.getKey(), entry.getValue(), messageCallback);
consumerThread.setDaemon(true);
consumerThread.start();
pollers.add(consumerThread);
}
consumerCancels.add(cancellable);
return cancellable;
}