protected def messageHandling: PartialFunction[()

in core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala [74:85]


  protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = {
    case (_, msg: KafkaConsumerActor.Internal.Messages[K @unchecked, V @unchecked]) =>
      // might be more than one in flight when we assign/revoke tps
      if (msg.requestId == requestId)
        requested = false
      buffer = buffer ++ msg.messages
      pump()
    case (_, Status.Failure(e)) =>
      failStage(e)
    case (_, Terminated(ref)) if ref == consumerActor =>
      failStage(new ConsumerFailed())
  }