in core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala [505:551]
def poll(): Unit = {
try {
val currentAssignmentsJava = consumer.assignment()
commitAggregatedOffsets()
if (requests.isEmpty) {
// no outstanding requests so we don't expect any messages back, but we should anyway
// drive the KafkaConsumer by polling
def checkNoResult(rawResult: ConsumerRecords[K, V]): Unit =
if (!rawResult.isEmpty)
throw new IllegalStateException(s"Got ${rawResult.count} unexpected messages")
consumer.pause(currentAssignmentsJava)
checkNoResult(consumer.poll(java.time.Duration.ZERO))
// COMMIT PERFORMANCE OPTIMIZATION
// For commits we try to avoid blocking poll because a commit normally succeeds after a few
// poll(0). Using poll(1) will always block for 1 ms, since there are no messages.
// Therefore we do 10 poll(0) with short 10 μs delay followed by 1 poll(1).
// If it's still not completed it will be tried again after the scheduled Poll.
var i = 10
while (i > 0 && commitsInProgress > 0) {
LockSupport.parkNanos(10 * 1000)
val pollTimeout = if (i == 1) oneMilli else java.time.Duration.ZERO
checkNoResult(consumer.poll(pollTimeout))
i -= 1
}
} else {
// resume partitions to fetch
val partitionsToFetch: Set[TopicPartition] = requests.values.flatMap(_.tps).toSet
val (resumeThese, pauseThese) = currentAssignmentsJava.asScala.partition(partitionsToFetch.contains)
consumer.pause(pauseThese.asJava)
consumer.resume(resumeThese.asJava)
processResult(partitionsToFetch, consumer.poll(pollTimeout))
}
} catch {
case e: org.apache.kafka.common.errors.SerializationException =>
processErrors(e)
case NonFatal(e) =>
processErrors(e)
log.error(e, "Exception when polling from consumer, stopping actor: {}", e.toString)
context.stop(self)
}
if (stopInProgress && commitsInProgress == 0) {
log.debug("Stopping")
context.stop(self)
}
}