in core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala [151:182]
override def onMessage(rec: ConsumerRecord[K, V]): Unit =
inFlightRecords.add(Map(new TopicPartition(rec.topic(), rec.partition()) -> rec.offset()))
override protected def stopConsumerActor(): Unit =
sourceActor.ref
.tell(Drain(
inFlightRecords.assigned(),
Some(consumerActor),
KafkaConsumerActor.Internal.StopFromStage(id)),
sourceActor.ref)
override protected def addToPartitionAssignmentHandler(
handler: PartitionAssignmentHandler): PartitionAssignmentHandler = {
val blockingRevokedCall = new PartitionAssignmentHandler {
override def onAssign(assignedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = ()
// This is invoked in the KafkaConsumerActor thread when doing poll.
override def onRevoke(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit =
if (waitForDraining(revokedTps)) {
sourceActor.ref.tell(Revoked(revokedTps.toList), consumerActor)
} else {
sourceActor.ref.tell(Failure(new Error("Timeout while draining")), consumerActor)
consumerActor.tell(KafkaConsumerActor.Internal.StopFromStage(id), consumerActor)
}
override def onLost(lostTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit =
onRevoke(lostTps, consumer)
override def onStop(revokedTps: Set[TopicPartition], consumer: RestrictedConsumer): Unit = ()
}
new PartitionAssignmentHelpers.Chain(handler, blockingRevokedCall)
}