in core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala [378:388]
def checkOverlappingRequests(updateType: String, fromStage: ActorRef, topics: Set[TopicPartition]): Unit =
// check if same topics/partitions have already been requested by someone else,
// which is an indication that something is wrong, but it might be alright when assignments change.
if (requests.nonEmpty) requests.foreach {
case (ref, r) =>
if (ref != fromStage && r.tps.exists(topics.apply)) {
log.warning("{} from topic/partition {} already requested by other stage {}", updateType, topics, r.tps)
ref ! Messages(r.requestId, Iterator.empty)
requests -= ref
}
}