private def processResult()

in core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala [615:645]


  private def processResult(partitionsToFetch: Set[TopicPartition], rawResult: ConsumerRecords[K, V]): Unit =
    if (!rawResult.isEmpty) {
      // check the we got only requested partitions and did not drop any messages
      val fetchedTps = rawResult.partitions().asScala
      if (fetchedTps.diff(partitionsToFetch).nonEmpty)
        throw new scala.IllegalArgumentException(
          s"Unexpected records polled. Expected: $partitionsToFetch, " +
          s"result: ${rawResult.partitions()}, consumer assignment: ${consumer.assignment()}")

      val safeRecords = resetProtection.protect(self, rawResult)
      progressTracker.received(safeRecords)

      // send messages to actors
      requests.foreach {
        case (stageActorRef, req) =>
          // gather all messages for ref
          // See https://github.com/akka/alpakka-kafka/issues/978
          // Temporary fix to avoid https://github.com/scala/bug/issues/11807
          // Using `VectorIterator` avoids the error from `ConcatIterator`
          val b = Vector.newBuilder[ConsumerRecord[K, V]]
          req.tps.foreach { tp =>
            val tpMessages = safeRecords.records(tp).asScala
            b ++= tpMessages
          }
          val messages = b.result().iterator
          if (messages.nonEmpty) {
            stageActorRef ! Messages(req.requestId, messages)
            requests -= stageActorRef
          }
      }
    }