in src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java [132:174]
private void consume() throws InterruptedException {
try {
while (!closed.get()) {
if (resetOffset.getAndSet(false)) {
// Make sure there is an assignment for this consumer
while (consumer.assignment().isEmpty() && !closed.get()) {
logger.atInfo().log(
"Resetting offset: no partitions assigned to the consumer, request assignment.");
consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
}
consumer.seekToBeginning(consumer.assignment());
}
ConsumerRecords<byte[], byte[]> consumerRecords =
consumer.poll(Duration.ofMillis(configuration.getPollingInterval()));
consumerRecords.forEach(
consumerRecord -> {
try (ManualRequestContext ctx = oneOffCtx.open()) {
Event event =
valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value());
messageProcessor.accept(event);
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"Malformed event '%s': [Exception: %s]",
new String(consumerRecord.value(), UTF_8));
subscriberMetrics.incrementSubscriberFailedToConsumeMessage();
}
});
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) {
logger.atSevere().withCause(e).log("Consumer loop of topic %s interrupted", topic);
reconnectAfterFailure();
}
} catch (Exception e) {
subscriberMetrics.incrementSubscriberFailedToPollMessages();
logger.atSevere().withCause(e).log(
"Existing consumer loop of topic %s because of a non-recoverable exception", topic);
reconnectAfterFailure();
} finally {
consumer.close();
}
}