in core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala [74:85]
protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = {
case (_, msg: KafkaConsumerActor.Internal.Messages[K @unchecked, V @unchecked]) =>
// might be more than one in flight when we assign/revoke tps
if (msg.requestId == requestId)
requested = false
buffer = buffer ++ msg.messages
pump()
case (_, Status.Failure(e)) =>
failStage(e)
case (_, Terminated(ref)) if ref == consumerActor =>
failStage(new ConsumerFailed())
}