in core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala [615:645]
private def processResult(partitionsToFetch: Set[TopicPartition], rawResult: ConsumerRecords[K, V]): Unit =
if (!rawResult.isEmpty) {
// check the we got only requested partitions and did not drop any messages
val fetchedTps = rawResult.partitions().asScala
if (fetchedTps.diff(partitionsToFetch).nonEmpty)
throw new scala.IllegalArgumentException(
s"Unexpected records polled. Expected: $partitionsToFetch, " +
s"result: ${rawResult.partitions()}, consumer assignment: ${consumer.assignment()}")
val safeRecords = resetProtection.protect(self, rawResult)
progressTracker.received(safeRecords)
// send messages to actors
requests.foreach {
case (stageActorRef, req) =>
// gather all messages for ref
// See https://github.com/akka/alpakka-kafka/issues/978
// Temporary fix to avoid https://github.com/scala/bug/issues/11807
// Using `VectorIterator` avoids the error from `ConcatIterator`
val b = Vector.newBuilder[ConsumerRecord[K, V]]
req.tps.foreach { tp =>
val tpMessages = safeRecords.records(tp).asScala
b ++= tpMessages
}
val messages = b.result().iterator
if (messages.nonEmpty) {
stageActorRef ! Messages(req.requestId, messages)
requests -= stageActorRef
}
}
}