in twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java [271:307]
private MessageCallback wrapCallback(final MessageCallback callback,
final ExecutorService executor, final Cancellable cancellable) {
final AtomicBoolean stopped = new AtomicBoolean();
return new MessageCallback() {
@Override
public void onReceived(final Iterator<FetchedMessage> messages) {
if (stopped.get()) {
return;
}
Futures.getUnchecked(executor.submit(new Runnable() {
@Override
public void run() {
if (stopped.get()) {
return;
}
callback.onReceived(messages);
}
}));
}
@Override
public void finished() {
// Make sure finished only get called once.
if (!stopped.compareAndSet(false, true)) {
return;
}
Futures.getUnchecked(executor.submit(new Runnable() {
@Override
public void run() {
// When finished is called, also cancel the consumption from all polling thread.
callback.finished();
cancellable.cancel();
}
}));
}
};
}