public void run()

in twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java [331:402]


    public void run() {
      final AtomicLong offset = new AtomicLong(startOffset);

      Map.Entry<BrokerInfo, SimpleConsumer> consumerEntry = null;
      ExponentialBackoff backoff = new ExponentialBackoff(INIT_CONSUMER_FAILURE_BACKOFF,
                                                          MAX_CONSUMER_FAILURE_BACKOFF, TimeUnit.MILLISECONDS);
      while (running) {
        if (consumerEntry == null && (consumerEntry = getConsumerEntry()) == null) {
          LOG.debug("No leader for topic partition {}.", topicPart);
          backoff.backoff();
          continue;
        }

        // If offset < 0, meaning it's special offset value that needs to fetch either the earliest or latest offset
        // from kafak server.
        long off = offset.get();
        if (off < 0) {
          offset.set(getLastOffset(topicPart, off));
        }

        SimpleConsumer consumer = consumerEntry.getValue();

        // Fire a fetch message request
        try {
          FetchResponse response = fetchMessages(consumer, offset.get());

          // Failure response, set consumer entry to null and let next round of loop to handle it.
          if (response.hasError()) {
            short errorCode = response.errorCode(topicPart.getTopic(), topicPart.getPartition());
            LOG.info("Failed to fetch message on {}. Error: {}", topicPart, errorCode);
            // If it is out of range error, reset to earliest offset
            if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
              offset.set(kafka.api.OffsetRequest.EarliestTime());
            }

            consumers.refresh(consumerEntry.getKey());
            consumerEntry = null;
            continue;
          }

          ByteBufferMessageSet messages = response.messageSet(topicPart.getTopic(), topicPart.getPartition());
          if (sleepIfEmpty(messages)) {
            continue;
          }

          // Call the callback
          invokeCallback(messages, offset);
          backoff.reset();
        } catch (Throwable t) {
          // Only log if it is still running, otherwise, it just the interrupt caused by the stop.
          if (!running) {
            LOG.debug("Unable to fetch messages on {}, kafka consumer service shutdown is in progress.", topicPart);
          } else {
            if (t instanceof ClosedByInterruptException || t instanceof ConnectException) {
              LOG.debug("Unable to fetch messages on {}, kafka server shutdown is in progress.", topicPart);
            } else {
              LOG.info("Exception when fetching message on {}.", topicPart, t);
            }
            backoff.backoff();
          }
          consumers.refresh(consumerEntry.getKey());
          consumerEntry = null;
        }
      }

      // When the thread is done, call the callback finished method.
      try {
        callback.finished();
      } catch (Throwable t) {
        LOG.error("Exception thrown from MessageCallback.finished({})", running, t);
      }
    }