in core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala [430:439]
protected def messageHandling: PartialFunction[(ActorRef, Any), Unit] = {
case (_, msg: KafkaConsumerActor.Internal.Messages[K @unchecked, V @unchecked]) =>
requested = false
buffer = buffer ++ msg.messages
pump()
case (_, Status.Failure(e)) =>
failStage(e)
case (_, Terminated(ref)) if ref == consumerActor =>
failStage(new ConsumerFailed)
}