def checkOverlappingRequests()

in core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala [378:388]


  def checkOverlappingRequests(updateType: String, fromStage: ActorRef, topics: Set[TopicPartition]): Unit =
    // check if same topics/partitions have already been requested by someone else,
    // which is an indication that something is wrong, but it might be alright when assignments change.
    if (requests.nonEmpty) requests.foreach {
      case (ref, r) =>
        if (ref != fromStage && r.tps.exists(topics.apply)) {
          log.warning("{} from topic/partition {} already requested by other stage {}", updateType, topics, r.tps)
          ref ! Messages(r.requestId, Iterator.empty)
          requests -= ref
        }
    }