in twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaConsumer.java [331:402]
public void run() {
final AtomicLong offset = new AtomicLong(startOffset);
Map.Entry<BrokerInfo, SimpleConsumer> consumerEntry = null;
ExponentialBackoff backoff = new ExponentialBackoff(INIT_CONSUMER_FAILURE_BACKOFF,
MAX_CONSUMER_FAILURE_BACKOFF, TimeUnit.MILLISECONDS);
while (running) {
if (consumerEntry == null && (consumerEntry = getConsumerEntry()) == null) {
LOG.debug("No leader for topic partition {}.", topicPart);
backoff.backoff();
continue;
}
// If offset < 0, meaning it's special offset value that needs to fetch either the earliest or latest offset
// from kafak server.
long off = offset.get();
if (off < 0) {
offset.set(getLastOffset(topicPart, off));
}
SimpleConsumer consumer = consumerEntry.getValue();
// Fire a fetch message request
try {
FetchResponse response = fetchMessages(consumer, offset.get());
// Failure response, set consumer entry to null and let next round of loop to handle it.
if (response.hasError()) {
short errorCode = response.errorCode(topicPart.getTopic(), topicPart.getPartition());
LOG.info("Failed to fetch message on {}. Error: {}", topicPart, errorCode);
// If it is out of range error, reset to earliest offset
if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
offset.set(kafka.api.OffsetRequest.EarliestTime());
}
consumers.refresh(consumerEntry.getKey());
consumerEntry = null;
continue;
}
ByteBufferMessageSet messages = response.messageSet(topicPart.getTopic(), topicPart.getPartition());
if (sleepIfEmpty(messages)) {
continue;
}
// Call the callback
invokeCallback(messages, offset);
backoff.reset();
} catch (Throwable t) {
// Only log if it is still running, otherwise, it just the interrupt caused by the stop.
if (!running) {
LOG.debug("Unable to fetch messages on {}, kafka consumer service shutdown is in progress.", topicPart);
} else {
if (t instanceof ClosedByInterruptException || t instanceof ConnectException) {
LOG.debug("Unable to fetch messages on {}, kafka server shutdown is in progress.", topicPart);
} else {
LOG.info("Exception when fetching message on {}.", topicPart, t);
}
backoff.backoff();
}
consumers.refresh(consumerEntry.getKey());
consumerEntry = null;
}
}
// When the thread is done, call the callback finished method.
try {
callback.finished();
} catch (Throwable t) {
LOG.error("Exception thrown from MessageCallback.finished({})", running, t);
}
}