private MessageCallback wrapCallback()

in twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java [271:307]


    private MessageCallback wrapCallback(final MessageCallback callback,
                                         final ExecutorService executor, final Cancellable cancellable) {
      final AtomicBoolean stopped = new AtomicBoolean();
      return new MessageCallback() {
        @Override
        public void onReceived(final Iterator<FetchedMessage> messages) {
          if (stopped.get()) {
            return;
          }
          Futures.getUnchecked(executor.submit(new Runnable() {
            @Override
            public void run() {
              if (stopped.get()) {
                return;
              }
              callback.onReceived(messages);
            }
          }));
        }

        @Override
        public void finished() {
          // Make sure finished only get called once.
          if (!stopped.compareAndSet(false, true)) {
            return;
          }
          Futures.getUnchecked(executor.submit(new Runnable() {
            @Override
            public void run() {
              // When finished is called, also cancel the consumption from all polling thread.
              callback.finished();
              cancellable.cancel();
            }
          }));
        }
      };
    }